This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e71e156 [590] Changes in xtable-core for Hudi Catalog Sync
2e71e156 is described below

commit 2e71e156667f9304442e834410e95abd03837fc6
Author: Vamsi <[email protected]>
AuthorDate: Tue Feb 11 20:00:00 2025 +0530

    [590] Changes in xtable-core for Hudi Catalog Sync
---
 xtable-core/pom.xml                                |   6 +
 .../{CatalogUtils.java => CatalogPartition.java}   |  36 +-
 .../xtable/catalog/CatalogPartitionEvent.java      |  49 +++
 .../catalog/CatalogPartitionSyncOperations.java    |  99 +++++
 ...logUtils.java => CatalogPartitionSyncTool.java} |  31 +-
 .../org/apache/xtable/catalog/CatalogUtils.java    |  31 ++
 .../apache/xtable/delta/DeltaConversionTarget.java |   3 +-
 .../apache/xtable/delta/DeltaSchemaExtractor.java  |  96 -----
 .../xtable/exception/CatalogSyncException.java     |   4 +
 .../org/apache/xtable/hudi/HudiTableManager.java   |   4 +-
 .../hudi/catalog/HudiCatalogPartitionSyncTool.java | 471 +++++++++++++++++++++
 .../xtable/hudi/catalog/HudiCatalogTableUtils.java | 116 +++++
 .../xtable/hudi/catalog/HudiInputFormatUtils.java  | 112 +++++
 .../apache/xtable/schema/SparkSchemaExtractor.java | 133 ++++++
 .../xtable/delta/TestDeltaSchemaExtractor.java     | 134 ------
 .../catalog/TestHudiCatalogPartitionSyncTool.java  | 323 ++++++++++++++
 .../hudi/catalog/TestHudiCatalogTableUtils.java    | 144 +++++++
 .../hudi/catalog/TestHudiInputFormatUtils.java     | 118 ++++++
 .../TestSparkSchemaExtractor.java}                 | 258 ++---------
 .../org/apache/xtable/testutil/ITTestUtils.java    |   2 +
 20 files changed, 1678 insertions(+), 492 deletions(-)

diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 68d7a176..24bc31df 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -82,6 +82,12 @@
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-java-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-sync-common</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Iceberg dependencies -->
         <dependency>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartition.java
similarity index 52%
copy from xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
copy to 
xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartition.java
index 2e00ce1d..f2dfd4d3 100644
--- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
+++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartition.java
@@ -18,21 +18,31 @@
  
 package org.apache.xtable.catalog;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import java.util.List;
 
-import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import lombok.Getter;
 
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class CatalogUtils {
+/**
+ * This class is designed to encapsulate a set of partition values and the 
corresponding storage
+ * location where the data for this partition is stored.
+ */
+@Getter
+public class CatalogPartition {
+
+  /**
+   * A list of values defining this partition. For example, these values might 
correspond to
+   * partition keys in a dataset (e.g., year, month, day).
+   */
+  private final List<String> values;
+
+  /**
+   * The storage location associated with this partition. Typically, this 
would be a path in a file
+   * system or object store.
+   */
+  private final String storageLocation;
 
-  public static HierarchicalTableIdentifier toHierarchicalTableIdentifier(
-      CatalogTableIdentifier tableIdentifier) {
-    if (tableIdentifier instanceof HierarchicalTableIdentifier) {
-      return (HierarchicalTableIdentifier) tableIdentifier;
-    }
-    throw new IllegalArgumentException(
-        "Invalid tableIdentifier implementation: " + 
tableIdentifier.getClass().getName());
+  public CatalogPartition(List<String> values, String storageLocation) {
+    this.values = values;
+    this.storageLocation = storageLocation;
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionEvent.java
 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionEvent.java
new file mode 100644
index 00000000..2e58caa0
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.catalog;
+
+/** Partition Event captures any partition that needs to be added or updated. 
*/
+public class CatalogPartitionEvent {
+
+  public enum PartitionEventType {
+    ADD,
+    UPDATE,
+    DROP
+  }
+
+  public PartitionEventType eventType;
+  public String storagePartition;
+
+  CatalogPartitionEvent(PartitionEventType eventType, String storagePartition) 
{
+    this.eventType = eventType;
+    this.storagePartition = storagePartition;
+  }
+
+  public static CatalogPartitionEvent newPartitionAddEvent(String 
storagePartition) {
+    return new CatalogPartitionEvent(PartitionEventType.ADD, storagePartition);
+  }
+
+  public static CatalogPartitionEvent newPartitionUpdateEvent(String 
storagePartition) {
+    return new CatalogPartitionEvent(PartitionEventType.UPDATE, 
storagePartition);
+  }
+
+  public static CatalogPartitionEvent newPartitionDropEvent(String 
storagePartition) {
+    return new CatalogPartitionEvent(PartitionEventType.DROP, 
storagePartition);
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncOperations.java
 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..5b3266cc
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncOperations.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.catalog;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+/**
+ * Defines operations for managing partitions in an external catalog.
+ *
+ * <p>This interface provides methods to perform CRUD (Create, Read, Update, 
Delete) operations on
+ * partitions associated with a table in an external catalog system.
+ */
+public interface CatalogPartitionSyncOperations {
+
+  /**
+   * Retrieves all partitions associated with the specified table.
+   *
+   * @param tableIdentifier an object identifying the table whose partitions 
are to be fetched.
+   * @return a list of {@link CatalogPartition} objects representing all 
partitions of the specified
+   *     table.
+   */
+  List<CatalogPartition> getAllPartitions(CatalogTableIdentifier 
tableIdentifier);
+
+  /**
+   * Adds new partitions to the specified table in the catalog.
+   *
+   * @param tableIdentifier an object identifying the table where partitions 
are to be added.
+   * @param partitionsToAdd a list of partitions to be added to the table.
+   */
+  void addPartitionsToTable(
+      CatalogTableIdentifier tableIdentifier, List<CatalogPartition> 
partitionsToAdd);
+
+  /**
+   * Updates the specified partitions for a table in the catalog.
+   *
+   * @param tableIdentifier an object identifying the table whose partitions 
are to be updated.
+   * @param changedPartitions a list of partitions to be updated in the table.
+   */
+  void updatePartitionsToTable(
+      CatalogTableIdentifier tableIdentifier, List<CatalogPartition> 
changedPartitions);
+
+  /**
+   * Removes the specified partitions from a table in the catalog.
+   *
+   * @param tableIdentifier an object identifying the table from which 
partitions are to be dropped.
+   * @param partitionsToDrop a list of partitions to be removed from the table.
+   */
+  void dropPartitions(
+      CatalogTableIdentifier tableIdentifier, List<CatalogPartition> 
partitionsToDrop);
+
+  /**
+   * Retrieves the properties indicating the last synchronization state for 
the given table.
+   *
+   * <p>This method provides a default implementation that returns an empty 
map. Implementations of
+   * this interface can override it to fetch the actual last synced properties 
from a catalog.
+   *
+   * @param tableIdentifier the identifier of the table whose last synced 
properties are to be
+   *     fetched.
+   * @param keysToRetrieve a list of keys representing the specific properties 
to retrieve.
+   * @return a map of key-value pairs representing the last synchronization 
properties.
+   */
+  default Map<String, String> getTableProperties(
+      CatalogTableIdentifier tableIdentifier, List<String> keysToRetrieve) {
+    return new HashMap<>();
+  }
+
+  /**
+   * Updates the properties indicating the last synchronization state for the 
given table.
+   *
+   * <p>This method provides a default implementation that performs no 
operation. Implementations of
+   * this interface can override it to update the last synced properties in a 
catalog.
+   *
+   * @param tableIdentifier the identifier of the table whose last synced 
properties are to be
+   *     updated.
+   * @param propertiesToUpdate a map of key-value pairs representing the 
updated properties.
+   */
+  default void updateTableProperties(
+      CatalogTableIdentifier tableIdentifier, Map<String, String> 
propertiesToUpdate) {}
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncTool.java
similarity index 51%
copy from xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
copy to 
xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncTool.java
index 2e00ce1d..57f2f132 100644
--- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncTool.java
@@ -18,21 +18,24 @@
  
 package org.apache.xtable.catalog;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
+import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
 
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class CatalogUtils {
+/**
+ * Defines methods to synchronize all partitions from the storage to the 
catalog. Implementations of
+ * this interface will handle the logic for syncing partitions, including 
detecting partition
+ * changes and updating the catalog accordingly.
+ */
+public interface CatalogPartitionSyncTool {
 
-  public static HierarchicalTableIdentifier toHierarchicalTableIdentifier(
-      CatalogTableIdentifier tableIdentifier) {
-    if (tableIdentifier instanceof HierarchicalTableIdentifier) {
-      return (HierarchicalTableIdentifier) tableIdentifier;
-    }
-    throw new IllegalArgumentException(
-        "Invalid tableIdentifier implementation: " + 
tableIdentifier.getClass().getName());
-  }
+  /**
+   * Syncs all partitions on storage to the catalog.
+   *
+   * @param oneTable The object representing the table whose partitions are 
being synced. This
+   *     object contains necessary details to perform the sync operation.
+   * @param tableIdentifier The table in the catalog.
+   * @return {@code true} if one or more partition(s) are changed in the 
catalog; {@code false}
+   *     otherwise.
+   */
+  public boolean syncPartitions(InternalTable oneTable, CatalogTableIdentifier 
tableIdentifier);
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
index 2e00ce1d..7a845255 100644
--- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
+++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
@@ -18,11 +18,20 @@
  
 package org.apache.xtable.catalog;
 
+import java.util.Optional;
+
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool;
 import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.reflection.ReflectionUtils;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class CatalogUtils {
@@ -35,4 +44,26 @@ public class CatalogUtils {
     throw new IllegalArgumentException(
         "Invalid tableIdentifier implementation: " + 
tableIdentifier.getClass().getName());
   }
+
+  public static Optional<CatalogPartitionSyncTool> getPartitionSyncTool(
+      String tableFormat,
+      String partitionValueExtractorClass,
+      CatalogPartitionSyncOperations catalogPartitionSyncOperations,
+      Configuration configuration) {
+
+    if (partitionValueExtractorClass.isEmpty()) {
+      return Optional.empty();
+    }
+
+    // In Iceberg and Delta, partitions are automatically synchronized with 
catalogs when
+    // table metadata is updated. However, for Hudi, we need to sync them 
manually
+    if (tableFormat.equals(TableFormat.HUDI)) {
+      PartitionValueExtractor partitionValueExtractor =
+          ReflectionUtils.createInstanceOfClass(partitionValueExtractorClass);
+      return Optional.of(
+          new HudiCatalogPartitionSyncTool(
+              catalogPartitionSyncOperations, partitionValueExtractor, 
configuration));
+    }
+    return Optional.empty();
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index b34fa449..343a2d21 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -63,6 +63,7 @@ import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.schema.SparkSchemaExtractor;
 import org.apache.xtable.spi.sync.ConversionTarget;
 
 public class DeltaConversionTarget implements ConversionTarget {
@@ -247,7 +248,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
 
     private void setLatestSchema(InternalSchema schema) {
       this.latestSchemaInternal = schema;
-      this.latestSchema = schemaExtractor.fromInternalSchema(schema);
+      this.latestSchema = 
SparkSchemaExtractor.getInstance().fromInternalSchema(schema);
     }
 
     private void commitTransaction() {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index a10ee120..d1303e84 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -29,17 +29,13 @@ import lombok.NoArgsConstructor;
 
 import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.MetadataBuilder;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.exception.NotSupportedException;
-import org.apache.xtable.exception.SchemaExtractorException;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
@@ -59,104 +55,12 @@ import org.apache.xtable.schema.SchemaUtils;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class DeltaSchemaExtractor {
   private static final String DELTA_COLUMN_MAPPING_ID = 
"delta.columnMapping.id";
-  private static final String COMMENT = "comment";
   private static final DeltaSchemaExtractor INSTANCE = new 
DeltaSchemaExtractor();
 
   public static DeltaSchemaExtractor getInstance() {
     return INSTANCE;
   }
 
-  public StructType fromInternalSchema(InternalSchema internalSchema) {
-    StructField[] fields =
-        internalSchema.getFields().stream()
-            .map(
-                field ->
-                    new StructField(
-                        field.getName(),
-                        convertFieldType(field),
-                        field.getSchema().isNullable(),
-                        getMetaData(field.getSchema())))
-            .toArray(StructField[]::new);
-    return new StructType(fields);
-  }
-
-  private DataType convertFieldType(InternalField field) {
-    switch (field.getSchema().getDataType()) {
-      case ENUM:
-      case STRING:
-        return DataTypes.StringType;
-      case INT:
-        return DataTypes.IntegerType;
-      case LONG:
-      case TIMESTAMP_NTZ:
-        return DataTypes.LongType;
-      case BYTES:
-      case FIXED:
-      case UUID:
-        return DataTypes.BinaryType;
-      case BOOLEAN:
-        return DataTypes.BooleanType;
-      case FLOAT:
-        return DataTypes.FloatType;
-      case DATE:
-        return DataTypes.DateType;
-      case TIMESTAMP:
-        return DataTypes.TimestampType;
-      case DOUBLE:
-        return DataTypes.DoubleType;
-      case DECIMAL:
-        int precision =
-            (int) 
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
-        int scale =
-            (int) 
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
-        return DataTypes.createDecimalType(precision, scale);
-      case RECORD:
-        return fromInternalSchema(field.getSchema());
-      case MAP:
-        InternalField key =
-            field.getSchema().getFields().stream()
-                .filter(
-                    mapField ->
-                        
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()))
-                .findFirst()
-                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
-        InternalField value =
-            field.getSchema().getFields().stream()
-                .filter(
-                    mapField ->
-                        
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
-                .findFirst()
-                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
-        return DataTypes.createMapType(
-            convertFieldType(key), convertFieldType(value), 
value.getSchema().isNullable());
-      case LIST:
-        InternalField element =
-            field.getSchema().getFields().stream()
-                .filter(
-                    arrayField ->
-                        
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(
-                            arrayField.getName()))
-                .findFirst()
-                .orElseThrow(() -> new SchemaExtractorException("Invalid array 
schema"));
-        return DataTypes.createArrayType(
-            convertFieldType(element), element.getSchema().isNullable());
-      default:
-        throw new NotSupportedException("Unsupported type: " + 
field.getSchema().getDataType());
-    }
-  }
-
-  private Metadata getMetaData(InternalSchema schema) {
-    InternalType type = schema.getDataType();
-    MetadataBuilder metadataBuilder = new MetadataBuilder();
-    if (type == InternalType.UUID) {
-      metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
-    }
-    if (schema.getComment() != null) {
-      metadataBuilder.putString(COMMENT, schema.getComment());
-    }
-    return metadataBuilder.build();
-  }
-
   public InternalSchema toInternalSchema(StructType structType) {
     return toInternalSchema(structType, null, false, null, null);
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
 
b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
index 1960641c..5d783004 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
@@ -34,4 +34,8 @@ public class CatalogSyncException extends InternalException {
   public CatalogSyncException(String message, Throwable e) {
     super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message, e);
   }
+
+  public CatalogSyncException(String message) {
+    super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message);
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
index 1ac1b5aa..7c48e88d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
@@ -44,7 +44,7 @@ import org.apache.xtable.model.storage.DataLayoutStrategy;
 /** A class used to initialize new Hudi tables and load the metadata of 
existing tables. */
 @Log4j2
 @RequiredArgsConstructor(staticName = "of")
-class HudiTableManager {
+public class HudiTableManager {
   private static final String NONPARTITIONED_KEY_GENERATOR =
       "org.apache.hudi.keygen.NonpartitionedKeyGenerator";
   private static final String CUSTOM_KEY_GENERATOR = 
"org.apache.hudi.keygen.CustomKeyGenerator";
@@ -61,7 +61,7 @@ class HudiTableManager {
    * @param tableDataPath the path for the table
    * @return {@link HoodieTableMetaClient} if table exists, otherwise null
    */
-  Optional<HoodieTableMetaClient> loadTableMetaClientIfExists(String 
tableDataPath) {
+  public Optional<HoodieTableMetaClient> loadTableMetaClientIfExists(String 
tableDataPath) {
     try {
       return Optional.of(
           HoodieTableMetaClient.builder()
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java
new file mode 100644
index 00000000..792c7063
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hudi.catalog;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionEvent;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+@Log4j2
+public class HudiCatalogPartitionSyncTool implements CatalogPartitionSyncTool {
+
+  private final CatalogPartitionSyncOperations catalogClient;
+  private final HudiTableManager hudiTableManager;
+  private final PartitionValueExtractor partitionValuesExtractor;
+  private final Configuration configuration;
+
+  public static final String LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
+  public static final String LAST_COMMIT_COMPLETION_TIME_SYNC = 
"last_commit_completion_time_sync";
+
+  public HudiCatalogPartitionSyncTool(
+      CatalogPartitionSyncOperations catalogClient,
+      PartitionValueExtractor partitionValueExtractor,
+      Configuration configuration) {
+    this.catalogClient = catalogClient;
+    this.hudiTableManager = HudiTableManager.of(configuration);
+    this.partitionValuesExtractor = partitionValueExtractor;
+    this.configuration = configuration;
+  }
+
+  @VisibleForTesting
+  HudiCatalogPartitionSyncTool(
+      CatalogPartitionSyncOperations catalogClient,
+      HudiTableManager hudiTableManager,
+      PartitionValueExtractor partitionValueExtractor,
+      Configuration configuration) {
+    this.catalogClient = catalogClient;
+    this.hudiTableManager = hudiTableManager;
+    this.partitionValuesExtractor = partitionValueExtractor;
+    this.configuration = configuration;
+  }
+
+  HoodieTableMetaClient getMetaClient(String basePath) {
+    Optional<HoodieTableMetaClient> metaClientOpt =
+        hudiTableManager.loadTableMetaClientIfExists(basePath);
+
+    if (!metaClientOpt.isPresent()) {
+      throw new CatalogSyncException(
+          "failed to get meta client since table is not present in the base 
path " + basePath);
+    }
+
+    return metaClientOpt.get();
+  }
+
+  /**
+   * Syncs all partitions on storage to the catalog, by only making 
incremental changes.
+   *
+   * @param tableIdentifier The table in the catalog.
+   * @return {@code true} if one or more partition(s) are changed in the 
catalog; {@code false}
+   *     otherwise.
+   */
+  private boolean syncAllPartitions(
+      HoodieTableMetaClient metaClient,
+      InternalTable internalTable,
+      CatalogTableIdentifier tableIdentifier) {
+    try {
+      if (internalTable.getPartitioningFields().isEmpty()) {
+        return false;
+      }
+
+      List<CatalogPartition> allPartitionsInCatalog =
+          catalogClient.getAllPartitions(tableIdentifier);
+      List<String> allPartitionsOnStorage =
+          getAllPartitionPathsOnStorage(internalTable.getBasePath());
+      boolean partitionsChanged =
+          syncPartitions(
+              metaClient,
+              tableIdentifier,
+              getPartitionEvents(metaClient, allPartitionsInCatalog, 
allPartitionsOnStorage));
+      if (partitionsChanged) {
+        updateLastCommitTimeSynced(metaClient, tableIdentifier);
+      }
+      return partitionsChanged;
+    } catch (Exception e) {
+      throw new CatalogSyncException(
+          "Failed to sync partitions for table " + tableIdentifier.getId(), e);
+    }
+  }
+
+  /**
+   * Syncs all partitions on storage to the catalog, by only making 
incremental changes.
+   *
+   * @param tableIdentifier The table in the catalog.
+   * @return {@code true} if one or more partition(s) are changed in the 
catalog; {@code false}
+   *     otherwise.
+   */
+  @Override
+  public boolean syncPartitions(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {
+    Map<String, String> lastCommitTimeSyncedProperties =
+        catalogClient.getTableProperties(
+            tableIdentifier,
+            Arrays.asList(LAST_COMMIT_TIME_SYNC, 
LAST_COMMIT_COMPLETION_TIME_SYNC));
+    Option<String> lastCommitTimeSynced =
+        
Option.ofNullable(lastCommitTimeSyncedProperties.get(LAST_COMMIT_TIME_SYNC));
+    Option<String> lastCommitCompletionTimeSynced =
+        
Option.ofNullable(lastCommitTimeSyncedProperties.get(LAST_COMMIT_COMPLETION_TIME_SYNC));
+    HoodieTableMetaClient metaClient = getMetaClient(table.getBasePath());
+    if (!lastCommitTimeSynced.isPresent()
+        || 
metaClient.getActiveTimeline().isBeforeTimelineStarts(lastCommitTimeSynced.get()))
 {
+      // If the last commit time synced is before the start of the active 
timeline,
+      // the Hive sync falls back to list all partitions on storage, instead of
+      // reading active and archived timelines for written partitions.
+      log.info(
+          "Sync all partitions given the last commit time synced is empty or "
+              + "before the start of the active timeline. Listing all 
partitions in "
+              + table.getBasePath());
+      return syncAllPartitions(metaClient, table, tableIdentifier);
+    } else {
+      List<String> writtenPartitionsSince =
+          getWrittenPartitionsSince(
+              metaClient,
+              Option.ofNullable(lastCommitTimeSynced.get()),
+              Option.ofNullable(lastCommitCompletionTimeSynced.get()));
+      log.info("Storage partitions scan complete. Found " + 
writtenPartitionsSince.size());
+
+      // Sync the partitions if needed
+      // find dropped partitions, if any, in the latest commit
+      Set<String> droppedPartitions =
+          getDroppedPartitionsSince(
+              metaClient,
+              Option.ofNullable(lastCommitTimeSynced.get()),
+              Option.of(lastCommitCompletionTimeSynced.get()));
+      boolean partitionsChanged =
+          syncPartitions(metaClient, tableIdentifier, writtenPartitionsSince, 
droppedPartitions);
+      if (partitionsChanged) {
+        updateLastCommitTimeSynced(metaClient, tableIdentifier);
+      }
+      return partitionsChanged;
+    }
+  }
+
+  private void updateLastCommitTimeSynced(
+      HoodieTableMetaClient metaClient, CatalogTableIdentifier 
tableIdentifier) {
+    HoodieTimeline activeTimeline = metaClient.getActiveTimeline();
+    Option<String> lastCommitSynced = 
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+    Option<String> lastCommitCompletionSynced =
+        activeTimeline
+            .getInstantsOrderedByStateTransitionTime()
+            .skip(activeTimeline.countInstants() - 1)
+            .findFirst()
+            .map(i -> Option.of(i.getStateTransitionTime()))
+            .orElse(Option.empty());
+
+    if (lastCommitSynced.isPresent()) {
+      Map<String, String> lastSyncedProperties = new HashMap<>();
+      lastSyncedProperties.put(LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
+      lastSyncedProperties.put(LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionSynced.get());
+      catalogClient.updateTableProperties(tableIdentifier, 
lastSyncedProperties);
+    }
+  }
+
+  /**
+   * Gets all relative partitions paths in the Hudi table on storage.
+   *
+   * @return All relative partitions paths.
+   */
+  public List<String> getAllPartitionPathsOnStorage(String basePath) {
+    HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(configuration);
+    // ToDo - if we need to config to validate assumeDatePartitioning
+    return FSUtils.getAllPartitionPaths(engineContext, basePath, true, false);
+  }
+
+  public List<String> getWrittenPartitionsSince(
+      HoodieTableMetaClient metaClient,
+      Option<String> lastCommitTimeSynced,
+      Option<String> lastCommitCompletionTimeSynced) {
+    if (!lastCommitTimeSynced.isPresent()) {
+      String basePath = metaClient.getBasePathV2().toUri().toString();
+      log.info("Last commit time synced is not known, listing all partitions 
in " + basePath);
+      return getAllPartitionPathsOnStorage(basePath);
+    } else {
+      log.info(
+          "Last commit time synced is "
+              + lastCommitTimeSynced.get()
+              + ", Getting commits since then");
+      return TimelineUtils.getWrittenPartitions(
+          TimelineUtils.getCommitsTimelineAfter(
+              metaClient, lastCommitTimeSynced.get(), 
lastCommitCompletionTimeSynced));
+    }
+  }
+
+  /**
+   * Get the set of dropped partitions since the last synced commit. If last 
sync time is not known
+   * then consider only active timeline. Going through archive timeline is a 
costly operation, and
+   * it should be avoided unless some start time is given.
+   */
+  private Set<String> getDroppedPartitionsSince(
+      HoodieTableMetaClient metaClient,
+      Option<String> lastCommitTimeSynced,
+      Option<String> lastCommitCompletionTimeSynced) {
+    HoodieTimeline timeline =
+        lastCommitTimeSynced.isPresent()
+            ? TimelineUtils.getCommitsTimelineAfter(
+                metaClient, lastCommitTimeSynced.get(), 
lastCommitCompletionTimeSynced)
+            : metaClient.getActiveTimeline();
+    return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
+  }
+
+  /**
+   * Syncs added, updated, and dropped partitions to the catalog.
+   *
+   * @param tableIdentifier The table in the catalog.
+   * @param partitionEventList The partition change event list.
+   * @return {@code true} if one or more partition(s) are changed in the 
catalog; {@code false}
+   *     otherwise.
+   */
+  private boolean syncPartitions(
+      HoodieTableMetaClient metaClient,
+      CatalogTableIdentifier tableIdentifier,
+      List<CatalogPartitionEvent> partitionEventList) {
+    List<CatalogPartition> newPartitions =
+        filterPartitions(
+            metaClient.getBasePathV2(),
+            partitionEventList,
+            CatalogPartitionEvent.PartitionEventType.ADD);
+    if (!newPartitions.isEmpty()) {
+      log.info("New Partitions " + newPartitions);
+      catalogClient.addPartitionsToTable(tableIdentifier, newPartitions);
+    }
+
+    List<CatalogPartition> updatePartitions =
+        filterPartitions(
+            metaClient.getBasePathV2(),
+            partitionEventList,
+            CatalogPartitionEvent.PartitionEventType.UPDATE);
+    if (!updatePartitions.isEmpty()) {
+      log.info("Changed Partitions " + updatePartitions);
+      catalogClient.updatePartitionsToTable(tableIdentifier, updatePartitions);
+    }
+
+    List<CatalogPartition> dropPartitions =
+        filterPartitions(
+            metaClient.getBasePathV2(),
+            partitionEventList,
+            CatalogPartitionEvent.PartitionEventType.DROP);
+    if (!dropPartitions.isEmpty()) {
+      log.info("Drop Partitions " + dropPartitions);
+      catalogClient.dropPartitions(tableIdentifier, dropPartitions);
+    }
+
+    return !updatePartitions.isEmpty() || !newPartitions.isEmpty() || 
!dropPartitions.isEmpty();
+  }
+
+  private List<CatalogPartition> filterPartitions(
+      Path basePath,
+      List<CatalogPartitionEvent> events,
+      CatalogPartitionEvent.PartitionEventType eventType) {
+    return events.stream()
+        .filter(s -> s.eventType == eventType)
+        .map(
+            s ->
+                new CatalogPartition(
+                    
partitionValuesExtractor.extractPartitionValuesInPath(s.storagePartition),
+                    new Path(basePath, s.storagePartition).toUri().toString()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Syncs the list of storage partitions passed in (checks if the partition 
is in hive, if not adds
+   * it or if the partition path does not match, it updates the partition 
path).
+   *
+   * @param tableIdentifier The table name in the catalog.
+   * @param writtenPartitionsSince Partitions has been added, updated, or 
dropped since last synced.
+   * @param droppedPartitions Partitions that are dropped since last sync.
+   * @return {@code true} if one or more partition(s) are changed in the 
catalog; {@code false}
+   *     otherwise.
+   */
+  private boolean syncPartitions(
+      HoodieTableMetaClient metaClient,
+      CatalogTableIdentifier tableIdentifier,
+      List<String> writtenPartitionsSince,
+      Set<String> droppedPartitions) {
+    try {
+      if (writtenPartitionsSince.isEmpty()) {
+        return false;
+      }
+
+      List<CatalogPartition> hivePartitions = 
getTablePartitions(tableIdentifier);
+      return syncPartitions(
+          metaClient,
+          tableIdentifier,
+          getPartitionEvents(
+              metaClient, hivePartitions, writtenPartitionsSince, 
droppedPartitions));
+    } catch (Exception e) {
+      throw new CatalogSyncException(
+          "Failed to sync partitions for table " + tableIdentifier.getId(), e);
+    }
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to 
avoid fetching too
+   * many unnecessary partitions.
+   */
+  private List<CatalogPartition> getTablePartitions(CatalogTableIdentifier 
tableIdentifier) {
+    return catalogClient.getAllPartitions(tableIdentifier);
+  }
+
+  /**
+   * Gets the partition events for changed partitions.
+   *
+   * <p>This compares the list of all partitions of a table stored in the 
catalog and on the
+   * storage: (1) Partitions exist in the catalog, but NOT the storage: drops 
them in the catalog;
+   * (2) Partitions exist on the storage, but NOT the catalog: adds them to 
the catalog; (3)
+   * Partitions exist in both, but the partition path is different: update 
them in the catalog.
+   *
+   * @param allPartitionsInCatalog All partitions of a table stored in the 
catalog.
+   * @param allPartitionsOnStorage All partitions of a table stored on the 
storage.
+   * @return partition events for changed partitions.
+   */
+  private List<CatalogPartitionEvent> getPartitionEvents(
+      HoodieTableMetaClient metaClient,
+      List<CatalogPartition> allPartitionsInCatalog,
+      List<String> allPartitionsOnStorage) {
+    Map<String, String> paths = 
getPartitionValuesToPathMapping(allPartitionsInCatalog);
+    Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
+
+    List<CatalogPartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : allPartitionsOnStorage) {
+      Path storagePartitionPath =
+          FSUtils.getPartitionPath(metaClient.getBasePathV2(), 
storagePartition);
+      String fullStoragePartitionPath =
+          
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      List<String> storagePartitionValues =
+          
partitionValuesExtractor.extractPartitionValuesInPath(storagePartition);
+
+      if (!storagePartitionValues.isEmpty()) {
+        String storageValue = String.join(", ", storagePartitionValues);
+        // Remove partitions that exist on storage from the `partitionsToDrop` 
set,
+        // so the remaining partitions that exist in the catalog should be 
dropped
+        partitionsToDrop.remove(storageValue);
+        if (!paths.containsKey(storageValue)) {
+          
events.add(CatalogPartitionEvent.newPartitionAddEvent(storagePartition));
+        } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+          
events.add(CatalogPartitionEvent.newPartitionUpdateEvent(storagePartition));
+        }
+      }
+    }
+
+    partitionsToDrop.forEach(
+        storageValue -> {
+          String storagePath = paths.get(storageValue);
+          try {
+            String relativePath =
+                FSUtils.getRelativePartitionPath(
+                    metaClient.getBasePathV2(), new CachingPath(storagePath));
+            
events.add(CatalogPartitionEvent.newPartitionDropEvent(relativePath));
+          } catch (IllegalArgumentException e) {
+            log.error(
+                "Cannot parse the path stored in the catalog, ignoring it for "
+                    + "generating DROP partition event: \""
+                    + storagePath
+                    + "\".",
+                e);
+          }
+        });
+    return events;
+  }
+
+  /**
+   * Iterate over the storage partitions and find if there are any new 
partitions that need to be
+   * added or updated. Generate a list of PartitionEvent based on the changes 
required.
+   */
+  public List<CatalogPartitionEvent> getPartitionEvents(
+      HoodieTableMetaClient metaClient,
+      List<CatalogPartition> partitionsInCatalog,
+      List<String> writtenPartitionsOnStorage,
+      Set<String> droppedPartitionsOnStorage) {
+    Map<String, String> paths = 
getPartitionValuesToPathMapping(partitionsInCatalog);
+
+    List<CatalogPartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : writtenPartitionsOnStorage) {
+      Path storagePartitionPath =
+          FSUtils.getPartitionPath(metaClient.getBasePathV2(), 
storagePartition);
+      String fullStoragePartitionPath =
+          
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      List<String> storagePartitionValues =
+          
partitionValuesExtractor.extractPartitionValuesInPath(storagePartition);
+
+      if (droppedPartitionsOnStorage.contains(storagePartition)) {
+        
events.add(CatalogPartitionEvent.newPartitionDropEvent(storagePartition));
+      } else {
+        if (!storagePartitionValues.isEmpty()) {
+          String storageValue = String.join(", ", storagePartitionValues);
+          if (!paths.containsKey(storageValue)) {
+            
events.add(CatalogPartitionEvent.newPartitionAddEvent(storagePartition));
+          } else if 
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+            
events.add(CatalogPartitionEvent.newPartitionUpdateEvent(storagePartition));
+          }
+        }
+      }
+    }
+    return events;
+  }
+
+  /**
+   * Gets the partition values to the absolute path mapping based on the 
partition information from
+   * the catalog.
+   *
+   * @param partitionsInCatalog Partitions in the catalog.
+   * @return The partition values to the absolute path mapping.
+   */
+  private Map<String, String> getPartitionValuesToPathMapping(
+      List<CatalogPartition> partitionsInCatalog) {
+    Map<String, String> paths = new HashMap<>();
+    for (CatalogPartition tablePartition : partitionsInCatalog) {
+      List<String> hivePartitionValues = tablePartition.getValues();
+      String fullTablePartitionPath =
+          Path.getPathWithoutSchemeAndAuthority(new 
Path(tablePartition.getStorageLocation()))
+              .toUri()
+              .getPath();
+      paths.put(String.join(", ", hivePartitionValues), 
fullTablePartitionPath);
+    }
+    return paths;
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
new file mode 100644
index 00000000..e6d99706
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hudi.catalog;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SparkSchemaExtractor;
+
+/** Util class to fetch details about Hudi table */
+public class HudiCatalogTableUtils {
+
+  /**
+   * Get Spark Sql related table properties. This is used for spark datasource 
table.
+   *
+   * @param schema The schema to write to the table.
+   * @return A new parameters added the spark's table properties.
+   */
+  public static Map<String, String> getSparkTableProperties(
+      List<String> partitionNames,
+      String sparkVersion,
+      int schemaLengthThreshold,
+      InternalSchema schema) {
+    List<InternalField> partitionCols = new ArrayList<>();
+    List<InternalField> dataCols = new ArrayList<>();
+    Map<String, InternalField> column2Field = new HashMap<>();
+
+    for (InternalField field : schema.getFields()) {
+      column2Field.put(field.getName(), field);
+    }
+    // Get partition columns and data columns.
+    for (String partitionName : partitionNames) {
+      // Default the unknown partition fields to be String.
+      // Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
+      partitionCols.add(
+          column2Field.getOrDefault(
+              partitionName,
+              InternalField.builder()
+                  .name(partitionName)
+                  .schema(
+                      InternalSchema.builder()
+                          .dataType(InternalType.BYTES)
+                          .isNullable(false)
+                          .build())
+                  .build()));
+    }
+
+    for (InternalField field : schema.getFields()) {
+      if (!partitionNames.contains(field.getName())) {
+        dataCols.add(field);
+      }
+    }
+
+    List<InternalField> reOrderedFields = new ArrayList<>();
+    reOrderedFields.addAll(dataCols);
+    reOrderedFields.addAll(partitionCols);
+    InternalSchema reorderedSchema =
+        InternalSchema.builder()
+            .fields(reOrderedFields)
+            .dataType(InternalType.RECORD)
+            .name(schema.getName())
+            .build();
+
+    StructType sparkSchema = 
SparkSchemaExtractor.getInstance().fromInternalSchema(reorderedSchema);
+
+    Map<String, String> sparkProperties = new HashMap<>();
+    sparkProperties.put("spark.sql.sources.provider", "hudi");
+    if (!StringUtils.isNullOrEmpty(sparkVersion)) {
+      sparkProperties.put("spark.sql.create.version", sparkVersion);
+    }
+    // Split the schema string to multi-parts according the 
schemaLengthThreshold size.
+    String schemaString = sparkSchema.json();
+    int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / 
schemaLengthThreshold;
+    sparkProperties.put("spark.sql.sources.schema.numParts", 
String.valueOf(numSchemaPart));
+    // Add each part of schema string to sparkProperties
+    for (int i = 0; i < numSchemaPart; i++) {
+      int start = i * schemaLengthThreshold;
+      int end = Math.min(start + schemaLengthThreshold, schemaString.length());
+      sparkProperties.put("spark.sql.sources.schema.part." + i, 
schemaString.substring(start, end));
+    }
+    // Add partition columns
+    if (!partitionNames.isEmpty()) {
+      sparkProperties.put(
+          "spark.sql.sources.schema.numPartCols", 
String.valueOf(partitionNames.size()));
+      for (int i = 0; i < partitionNames.size(); i++) {
+        sparkProperties.put("spark.sql.sources.schema.partCol." + i, 
partitionNames.get(i));
+      }
+    }
+    return sparkProperties;
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiInputFormatUtils.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiInputFormatUtils.java
new file mode 100644
index 00000000..4d644494
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiInputFormatUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hudi.catalog;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import org.apache.xtable.exception.NotSupportedException;
+
+public class HudiInputFormatUtils {
+
+  /** Input Format, that provides a real-time view of data in a Hoodie table. 
*/
+  private static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT_CLASS =
+      "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat";
+
+  /**
+   * HoodieInputFormat which understands the Hoodie File Structure and filters 
files based on the
+   * Hoodie Mode.
+   */
+  private static final String HUDI_PARQUET_INPUT_FORMAT_CLASS =
+      "org.apache.hudi.hadoop.HoodieParquetInputFormat";
+
+  /** HoodieRealtimeInputFormat for HUDI datasets which store data in HFile 
base file format. */
+  private static final String HUDI_HFILE_REALTIME_INPUT_FORMAT_CLASS =
+      "org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat";
+
+  /** HoodieInputFormat for HUDI datasets which store data in HFile base file 
format. */
+  private static final String HUDI_HFILE_INPUT_FORMAT_CLASS =
+      "org.apache.hudi.hadoop.HoodieHFileInputFormat";
+
+  /** A MapReduce/ Hive input format for ORC files. */
+  private static final String ORC_INPUT_FORMAT_CLASS =
+      "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+
+  /** A Hive OutputFormat for ORC files. */
+  private static final String ORC_OUTPUT_FORMAT_CLASS =
+      "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+
+  /** A Parquet OutputFormat for Hive (with the deprecated package mapred) */
+  private static final String MAPRED_PARQUET_OUTPUT_FORMAT_CLASS =
+      "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+
+  /** A ParquetHiveSerDe for Hive (with the deprecated package mapred) */
+  private static final String PARQUET_SERDE_CLASS =
+      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
+
+  /**
+   * A serde class for ORC. It transparently passes the object to/ from the 
ORC file reader/ writer.
+   */
+  private static final String ORC_SERDE_CLASS = 
"org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+
+  public static String getInputFormatClassName(HoodieFileFormat 
baseFileFormat, boolean realtime) {
+    switch (baseFileFormat) {
+      case PARQUET:
+        if (realtime) {
+          return HUDI_PARQUET_REALTIME_INPUT_FORMAT_CLASS;
+        } else {
+          return HUDI_PARQUET_INPUT_FORMAT_CLASS;
+        }
+      case HFILE:
+        if (realtime) {
+          return HUDI_HFILE_REALTIME_INPUT_FORMAT_CLASS;
+        } else {
+          return HUDI_HFILE_INPUT_FORMAT_CLASS;
+        }
+      case ORC:
+        return ORC_INPUT_FORMAT_CLASS;
+      default:
+        throw new NotSupportedException(
+            "Hudi InputFormat not implemented for base file format " + 
baseFileFormat);
+    }
+  }
+
+  public static String getOutputFormatClassName(HoodieFileFormat 
baseFileFormat) {
+    switch (baseFileFormat) {
+      case PARQUET:
+      case HFILE:
+        return MAPRED_PARQUET_OUTPUT_FORMAT_CLASS;
+      case ORC:
+        return ORC_OUTPUT_FORMAT_CLASS;
+      default:
+        throw new NotSupportedException("No OutputFormat for base file format 
" + baseFileFormat);
+    }
+  }
+
+  public static String getSerDeClassName(HoodieFileFormat baseFileFormat) {
+    switch (baseFileFormat) {
+      case PARQUET:
+      case HFILE:
+        return PARQUET_SERDE_CLASS;
+      case ORC:
+        return ORC_SERDE_CLASS;
+      default:
+        throw new NotSupportedException("No SerDe for base file format " + 
baseFileFormat);
+    }
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
new file mode 100644
index 00000000..cda64941
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.schema;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.exception.SchemaExtractorException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class SparkSchemaExtractor {
+
+  private static final SparkSchemaExtractor INSTANCE = new 
SparkSchemaExtractor();
+  private static final String COMMENT = "comment";
+
+  public static SparkSchemaExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public StructType fromInternalSchema(InternalSchema internalSchema) {
+    StructField[] fields =
+        internalSchema.getFields().stream()
+            .map(
+                field ->
+                    new StructField(
+                        field.getName(),
+                        convertFieldType(field),
+                        field.getSchema().isNullable(),
+                        getMetaData(field.getSchema())))
+            .toArray(StructField[]::new);
+    return new StructType(fields);
+  }
+
+  private DataType convertFieldType(InternalField field) {
+    switch (field.getSchema().getDataType()) {
+      case ENUM:
+      case STRING:
+        return DataTypes.StringType;
+      case INT:
+        return DataTypes.IntegerType;
+      case LONG:
+      case TIMESTAMP_NTZ:
+        return DataTypes.LongType;
+      case BYTES:
+      case FIXED:
+      case UUID:
+        return DataTypes.BinaryType;
+      case BOOLEAN:
+        return DataTypes.BooleanType;
+      case FLOAT:
+        return DataTypes.FloatType;
+      case DATE:
+        return DataTypes.DateType;
+      case TIMESTAMP:
+        return DataTypes.TimestampType;
+      case DOUBLE:
+        return DataTypes.DoubleType;
+      case DECIMAL:
+        int precision =
+            (int) 
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+        int scale =
+            (int) 
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+        return DataTypes.createDecimalType(precision, scale);
+      case RECORD:
+        return fromInternalSchema(field.getSchema());
+      case MAP:
+        InternalField key =
+            field.getSchema().getFields().stream()
+                .filter(
+                    mapField ->
+                        
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
+        InternalField value =
+            field.getSchema().getFields().stream()
+                .filter(
+                    mapField ->
+                        
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
+        return DataTypes.createMapType(
+            convertFieldType(key), convertFieldType(value), 
value.getSchema().isNullable());
+      case LIST:
+        InternalField element =
+            field.getSchema().getFields().stream()
+                .filter(
+                    arrayField ->
+                        
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(
+                            arrayField.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid array 
schema"));
+        return DataTypes.createArrayType(
+            convertFieldType(element), element.getSchema().isNullable());
+      default:
+        throw new NotSupportedException("Unsupported type: " + 
field.getSchema().getDataType());
+    }
+  }
+
+  private Metadata getMetaData(InternalSchema schema) {
+    InternalType type = schema.getDataType();
+    MetadataBuilder metadataBuilder = new MetadataBuilder();
+    if (type == InternalType.UUID) {
+      metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
+    }
+    if (schema.getComment() != null) {
+      metadataBuilder.putString(COMMENT, schema.getComment());
+    }
+    return metadataBuilder.build();
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 361245fe..9c235a19 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -246,44 +246,12 @@ public class TestDeltaSchemaExtractor {
             .add("requiredDecimal", DataTypes.createDecimalType(10, 2), false)
             .add("optionalDecimal", DataTypes.createDecimalType(10, 2), true);
 
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
     Assertions.assertEquals(
         internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
   }
 
   @Test
   public void testFixedBytes() {
-    InternalSchema internalSchemaOriginal =
-        InternalSchema.builder()
-            .name("struct")
-            .dataType(InternalType.RECORD)
-            .isNullable(false)
-            .fields(
-                Arrays.asList(
-                    InternalField.builder()
-                        .name("requiredFixed")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("fixed")
-                                .dataType(InternalType.FIXED)
-                                .isNullable(false)
-                                .comment("comment")
-                                .build())
-                        .build(),
-                    InternalField.builder()
-                        .name("optionalFixed")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("fixed")
-                                .dataType(InternalType.FIXED)
-                                .isNullable(true)
-                                .build())
-                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
-                        .build()))
-            .build();
-
     InternalSchema internalSchemaAfterRoundTrip =
         InternalSchema.builder()
             .name("struct")
@@ -317,9 +285,6 @@ public class TestDeltaSchemaExtractor {
             .add("requiredFixed", DataTypes.BinaryType, false, "comment")
             .add("optionalFixed", DataTypes.BinaryType, true);
 
-    Assertions.assertEquals(
-        structRepresentation,
-        
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaOriginal));
     Assertions.assertEquals(
         internalSchemaAfterRoundTrip,
         
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
@@ -360,101 +325,14 @@ public class TestDeltaSchemaExtractor {
                         .build()))
             .build();
 
-    InternalSchema internalSchemaTimestampNtz =
-        InternalSchema.builder()
-            .name("struct")
-            .dataType(InternalType.RECORD)
-            .isNullable(false)
-            .fields(
-                Arrays.asList(
-                    InternalField.builder()
-                        .name("requiredTimestampNtz")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("timestampNtz")
-                                .dataType(InternalType.TIMESTAMP_NTZ)
-                                .isNullable(false)
-                                .build())
-                        .build(),
-                    InternalField.builder()
-                        .name("optionalTimestampNtz")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("timestampNtz")
-                                .dataType(InternalType.TIMESTAMP_NTZ)
-                                .isNullable(true)
-                                .build())
-                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
-                        .build()))
-            .build();
-
     StructType structRepresentationTimestamp =
         new StructType()
             .add("requiredTimestamp", DataTypes.TimestampType, false)
             .add("optionalTimestamp", DataTypes.TimestampType, true);
 
-    StructType structRepresentationTimestampNtz =
-        new StructType()
-            .add("requiredTimestampNtz", DataTypes.LongType, false)
-            .add("optionalTimestampNtz", DataTypes.LongType, true);
-
-    Assertions.assertEquals(
-        structRepresentationTimestamp,
-        
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestamp));
     Assertions.assertEquals(
         internalSchemaTimestamp,
         
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentationTimestamp));
-    Assertions.assertEquals(
-        structRepresentationTimestampNtz,
-        
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestampNtz));
-  }
-
-  @Test
-  public void testEnums() {
-    Map<InternalSchema.MetadataKey, Object> requiredEnumMetadata = new 
HashMap<>();
-    requiredEnumMetadata.put(InternalSchema.MetadataKey.ENUM_VALUES, 
Arrays.asList("ONE", "TWO"));
-    Map<InternalSchema.MetadataKey, Object> optionalEnumMetadata = new 
HashMap<>();
-    optionalEnumMetadata.put(
-        InternalSchema.MetadataKey.ENUM_VALUES, Arrays.asList("THREE", 
"FOUR"));
-
-    InternalSchema internalSchema =
-        InternalSchema.builder()
-            .name("struct")
-            .dataType(InternalType.RECORD)
-            .isNullable(false)
-            .fields(
-                Arrays.asList(
-                    InternalField.builder()
-                        .name("requiredEnum")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("REQUIRED_ENUM")
-                                .dataType(InternalType.ENUM)
-                                .isNullable(false)
-                                .metadata(requiredEnumMetadata)
-                                .build())
-                        .build(),
-                    InternalField.builder()
-                        .name("optionalEnum")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("OPTIONAL_ENUM")
-                                .dataType(InternalType.ENUM)
-                                .isNullable(true)
-                                .metadata(optionalEnumMetadata)
-                                .build())
-                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
-                        .build()))
-            .build();
-
-    StructType structRepresentation =
-        new StructType()
-            .add("requiredEnum", DataTypes.StringType, false)
-            .add("optionalEnum", DataTypes.StringType, true);
-
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
   }
 
   @Test
@@ -567,9 +445,6 @@ public class TestDeltaSchemaExtractor {
                 false)
             .add("recordMap", DataTypes.createMapType(DataTypes.IntegerType, 
mapElement, true));
 
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
     Assertions.assertEquals(
         internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
   }
@@ -660,9 +535,6 @@ public class TestDeltaSchemaExtractor {
             .add("intList", DataTypes.createArrayType(DataTypes.IntegerType, 
false), false)
             .add("recordList", DataTypes.createArrayType(elementSchema, true), 
true);
 
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
     Assertions.assertEquals(
         internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
   }
@@ -757,9 +629,6 @@ public class TestDeltaSchemaExtractor {
                         false),
                 true,
                 "comment");
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
     Assertions.assertEquals(
         internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
   }
@@ -939,9 +808,6 @@ public class TestDeltaSchemaExtractor {
                         
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
                         .build()))
             .build();
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
     Assertions.assertEquals(
         internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java
new file mode 100644
index 00000000..0c33013a
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hudi.catalog;
+
+import static 
org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool.LAST_COMMIT_COMPLETION_TIME_SYNC;
+import static 
org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool.LAST_COMMIT_TIME_SYNC;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import lombok.SneakyThrows;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHudiCatalogPartitionSyncTool {
+
+  protected static final String HMS_DATABASE = "hms_db";
+  protected static final String HMS_TABLE = "hms_table";
+  protected static final String TEST_BASE_PATH = "test-base-path";
+
+  protected static String avroSchema =
+      
"{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"partitionKey\",\"type\":\"string\"}]}";
+  protected static final InternalTable TEST_INTERNAL_TABLE_WITH_SCHEMA =
+      InternalTable.builder()
+          .basePath(TEST_BASE_PATH)
+          .readSchema(
+              AvroSchemaConverter.getInstance()
+                  .toInternalSchema(new Schema.Parser().parse(avroSchema)))
+          .partitioningFields(
+              Collections.singletonList(
+                  InternalPartitionField.builder()
+                      .sourceField(
+                          InternalField.builder()
+                              .name("partitionKey")
+                              .schema(
+                                  
InternalSchema.builder().dataType(InternalType.STRING).build())
+                              .build())
+                      .build()))
+          .build();
+
+  protected static final ThreePartHierarchicalTableIdentifier 
TEST_TABLE_IDENTIFIER =
+      new ThreePartHierarchicalTableIdentifier(HMS_DATABASE, HMS_TABLE);
+
+  @Mock private CatalogPartitionSyncOperations mockCatalogClient;
+  @Mock private HoodieTableMetaClient mockMetaClient;
+  @Mock private PartitionValueExtractor mockPartitionValueExtractor;
+  @Mock private HudiTableManager mockHudiTableManager;
+
+  private final Configuration mockConfiguration = new Configuration();
+  private HudiCatalogPartitionSyncTool mockHudiCatalogPartitionSyncTool;
+
+  private HudiCatalogPartitionSyncTool createMockHudiPartitionSyncTool() {
+    return new HudiCatalogPartitionSyncTool(
+        mockCatalogClient, mockHudiTableManager, mockPartitionValueExtractor, 
mockConfiguration);
+  }
+
+  private void setupCommonMocks() {
+    mockHudiCatalogPartitionSyncTool = createMockHudiPartitionSyncTool();
+  }
+
+  @SneakyThrows
+  @Test
+  void testSyncAllPartitions() {
+    setupCommonMocks();
+
+    String partitionKey1 = "key1";
+    String partitionKey2 = "key2";
+    ZonedDateTime zonedDateTime =
+        
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
+    try (MockedStatic<ZonedDateTime> mockZonedDateTime = 
mockStatic(ZonedDateTime.class);
+        MockedStatic<FSUtils> mockFSUtils = mockStatic(FSUtils.class)) {
+      mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime);
+      List<String> mockedPartitions = Arrays.asList(partitionKey1, 
partitionKey2);
+      mockFSUtils
+          .when(() -> FSUtils.getAllPartitionPaths(any(), eq(TEST_BASE_PATH), 
eq(true), eq(false)))
+          .thenReturn(mockedPartitions);
+      mockFSUtils
+          .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), 
partitionKey1))
+          .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey1));
+      mockFSUtils
+          .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), 
partitionKey2))
+          .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey2));
+      when(mockHudiTableManager.loadTableMetaClientIfExists(TEST_BASE_PATH))
+          .thenReturn(Optional.of(mockMetaClient));
+      when(mockMetaClient.getBasePathV2()).thenReturn(new 
Path(TEST_BASE_PATH));
+      
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey1))
+          .thenReturn(Collections.singletonList(partitionKey1));
+      
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey2))
+          .thenReturn(Collections.singletonList(partitionKey2));
+
+      HoodieActiveTimeline mockTimeline = mock(HoodieActiveTimeline.class);
+      HoodieInstant instant1 =
+          new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit", 
"100", "1000");
+      HoodieInstant instant2 =
+          new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit", 
"101", "1100");
+      when(mockTimeline.countInstants()).thenReturn(2);
+      when(mockTimeline.lastInstant()).thenReturn(Option.of(instant2));
+      when(mockTimeline.getInstantsOrderedByStateTransitionTime())
+          .thenReturn(Stream.of(instant1, instant2));
+      when(mockMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+      CatalogPartition p1 =
+          new CatalogPartition(Collections.singletonList(partitionKey1), 
partitionKey1);
+      when(mockCatalogClient.getAllPartitions(TEST_TABLE_IDENTIFIER))
+          .thenReturn(Collections.singletonList(p1));
+
+      assertTrue(
+          mockHudiCatalogPartitionSyncTool.syncPartitions(
+              TEST_INTERNAL_TABLE_WITH_SCHEMA, TEST_TABLE_IDENTIFIER));
+
+      ArgumentCaptor<List<CatalogPartition>> addPartitionsCaptor =
+          ArgumentCaptor.forClass(List.class);
+      verify(mockCatalogClient, times(1))
+          .addPartitionsToTable(eq(TEST_TABLE_IDENTIFIER), 
addPartitionsCaptor.capture());
+      List<CatalogPartition> addedPartitions = addPartitionsCaptor.getValue();
+      assertEquals(1, addedPartitions.size());
+      assertEquals(
+          TEST_BASE_PATH + "/" + partitionKey2, 
addedPartitions.get(0).getStorageLocation());
+      assertEquals(1, addedPartitions.get(0).getValues().size());
+      assertEquals(partitionKey2, addedPartitions.get(0).getValues().get(0));
+
+      verify(mockCatalogClient, times(0)).dropPartitions(any(), anyList());
+
+      ArgumentCaptor<Map<String, String>> lastSyncedPropertiesCaptor =
+          ArgumentCaptor.forClass(Map.class);
+      verify(mockCatalogClient, times(1))
+          .updateTableProperties(eq(TEST_TABLE_IDENTIFIER), 
lastSyncedPropertiesCaptor.capture());
+      Map<String, String> lastSyncedProperties = 
lastSyncedPropertiesCaptor.getValue();
+      assertEquals("101", lastSyncedProperties.get(LAST_COMMIT_TIME_SYNC));
+      assertEquals("1100", 
lastSyncedProperties.get(LAST_COMMIT_COMPLETION_TIME_SYNC));
+    }
+  }
+
+  @SneakyThrows
+  @Test
+  void testSyncPartitionsSinceLastSyncTime() {
+    setupCommonMocks();
+
+    String partitionKey1 = "key1";
+    String partitionKey2 = "key2";
+    String partitionKey3 = "key3";
+    ZonedDateTime zonedDateTime =
+        
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
+    try (MockedStatic<ZonedDateTime> mockZonedDateTime = 
mockStatic(ZonedDateTime.class);
+        MockedStatic<FSUtils> mockFSUtils = mockStatic(FSUtils.class);
+        MockedStatic<TimelineUtils> mockedTimelineUtils = 
mockStatic(TimelineUtils.class)) {
+      mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime);
+      List<String> mockedPartitions = Arrays.asList(partitionKey1, 
partitionKey2);
+      mockFSUtils
+          .when(() -> FSUtils.getAllPartitionPaths(any(), eq(TEST_BASE_PATH), 
eq(true), eq(false)))
+          .thenReturn(mockedPartitions);
+      mockFSUtils
+          .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), 
partitionKey2))
+          .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey2));
+      mockFSUtils
+          .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), 
partitionKey3))
+          .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey3));
+      when(mockHudiTableManager.loadTableMetaClientIfExists(TEST_BASE_PATH))
+          .thenReturn(Optional.of(mockMetaClient));
+      when(mockMetaClient.getBasePathV2()).thenReturn(new 
Path(TEST_BASE_PATH));
+      
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey2))
+          .thenReturn(Collections.singletonList(partitionKey2));
+      
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey3))
+          .thenReturn(Collections.singletonList(partitionKey3));
+
+      HoodieActiveTimeline mockTimeline = mock(HoodieActiveTimeline.class);
+      HoodieInstant instant1 =
+          new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit", 
"100", "1000");
+      HoodieInstant instant2 =
+          new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit", 
"101", "1100");
+
+      when(mockTimeline.countInstants()).thenReturn(2);
+      when(mockTimeline.lastInstant()).thenReturn(Option.of(instant2));
+      when(mockTimeline.getInstantsOrderedByStateTransitionTime())
+          .thenReturn(Stream.of(instant1, instant2));
+      when(mockMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+      Map<String, String> lastSyncedTimeProperties = new HashMap<>();
+      lastSyncedTimeProperties.put(LAST_COMMIT_TIME_SYNC, "100");
+      lastSyncedTimeProperties.put(LAST_COMMIT_COMPLETION_TIME_SYNC, "1000");
+      when(mockCatalogClient.getTableProperties(
+              TEST_TABLE_IDENTIFIER,
+              Arrays.asList(LAST_COMMIT_TIME_SYNC, 
LAST_COMMIT_COMPLETION_TIME_SYNC)))
+          .thenReturn(lastSyncedTimeProperties);
+      when(mockTimeline.isBeforeTimelineStarts("100")).thenReturn(false);
+
+      // prepare timeline util mocks
+      mockedTimelineUtils
+          .when(() -> TimelineUtils.getWrittenPartitions(any()))
+          .thenReturn(Arrays.asList(partitionKey2, partitionKey3));
+      mockedTimelineUtils
+          .when(
+              () -> TimelineUtils.getCommitsTimelineAfter(mockMetaClient, 
"100", Option.of("1000")))
+          .thenReturn(mockTimeline);
+      mockedTimelineUtils
+          .when(() -> TimelineUtils.getDroppedPartitions(mockTimeline))
+          .thenReturn(Collections.singletonList(partitionKey2));
+
+      CatalogPartition p1 =
+          new CatalogPartition(
+              Collections.singletonList(partitionKey1), TEST_BASE_PATH + "/" + 
partitionKey1);
+      CatalogPartition p2 =
+          new CatalogPartition(
+              Collections.singletonList(partitionKey2), TEST_BASE_PATH + "/" + 
partitionKey2);
+      when(mockCatalogClient.getAllPartitions(TEST_TABLE_IDENTIFIER))
+          .thenReturn(Arrays.asList(p1, p2));
+
+      assertTrue(
+          mockHudiCatalogPartitionSyncTool.syncPartitions(
+              TEST_INTERNAL_TABLE_WITH_SCHEMA, TEST_TABLE_IDENTIFIER));
+
+      // verify add partitions
+      ArgumentCaptor<ThreePartHierarchicalTableIdentifier> 
tableIdentifierArgumentCaptor =
+          ArgumentCaptor.forClass(ThreePartHierarchicalTableIdentifier.class);
+      ArgumentCaptor<List<CatalogPartition>> addPartitionsCaptor =
+          ArgumentCaptor.forClass(List.class);
+      verify(mockCatalogClient, times(1))
+          .addPartitionsToTable(
+              tableIdentifierArgumentCaptor.capture(), 
addPartitionsCaptor.capture());
+      List<CatalogPartition> addedPartitions = addPartitionsCaptor.getValue();
+      assertEquals(
+          TEST_TABLE_IDENTIFIER.getDatabaseName(),
+          tableIdentifierArgumentCaptor.getValue().getDatabaseName());
+      assertEquals(
+          TEST_TABLE_IDENTIFIER.getTableName(),
+          tableIdentifierArgumentCaptor.getValue().getTableName());
+      assertEquals(1, addedPartitions.size());
+      assertEquals(
+          TEST_BASE_PATH + "/" + partitionKey3, 
addedPartitions.get(0).getStorageLocation());
+      assertEquals(1, addedPartitions.get(0).getValues().size());
+      assertEquals(partitionKey3, addedPartitions.get(0).getValues().get(0));
+
+      // verify drop partitions
+      ArgumentCaptor<List<CatalogPartition>> dropPartitionsCaptor =
+          ArgumentCaptor.forClass(List.class);
+      verify(mockCatalogClient, times(1))
+          .dropPartitions(tableIdentifierArgumentCaptor.capture(), 
dropPartitionsCaptor.capture());
+      List<CatalogPartition> droppedPartitions = 
dropPartitionsCaptor.getValue();
+      assertEquals(
+          TEST_TABLE_IDENTIFIER.getDatabaseName(),
+          tableIdentifierArgumentCaptor.getValue().getDatabaseName());
+      assertEquals(
+          TEST_TABLE_IDENTIFIER.getTableName(),
+          tableIdentifierArgumentCaptor.getValue().getTableName());
+      assertEquals(droppedPartitions.size(), 1);
+      assertEquals(
+          TEST_BASE_PATH + "/" + partitionKey2, 
droppedPartitions.get(0).getStorageLocation());
+      assertEquals(1, droppedPartitions.get(0).getValues().size());
+      assertEquals(partitionKey2, droppedPartitions.get(0).getValues().get(0));
+
+      // verify update last synced properties
+      ArgumentCaptor<Map<String, String>> lastSyncedPropertiesCaptor =
+          ArgumentCaptor.forClass(Map.class);
+      verify(mockCatalogClient, times(1))
+          .updateTableProperties(eq(TEST_TABLE_IDENTIFIER), 
lastSyncedPropertiesCaptor.capture());
+      Map<String, String> lastSyncedProperties = 
lastSyncedPropertiesCaptor.getValue();
+      assertEquals("101", lastSyncedProperties.get(LAST_COMMIT_TIME_SYNC));
+      assertEquals("1100", 
lastSyncedProperties.get(LAST_COMMIT_COMPLETION_TIME_SYNC));
+    }
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
new file mode 100644
index 00000000..e39d63e5
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hudi.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestHudiCatalogTableUtils {
+
+  @Test
+  void testGetSparkTableProperties() {
+
+    List<String> partitionNames = Arrays.asList("region", "category");
+    String sparkVersion = "3.2.1";
+    int schemaLengthThreshold = 1000;
+    InternalSchema schema =
+        InternalSchema.builder()
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("id")
+                        .schema(
+                            InternalSchema.builder()
+                                .dataType(InternalType.INT)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("name")
+                        .schema(
+                            InternalSchema.builder()
+                                .dataType(InternalType.STRING)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("region")
+                        .schema(
+                            InternalSchema.builder()
+                                .dataType(InternalType.STRING)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("category")
+                        .schema(
+                            InternalSchema.builder()
+                                .dataType(InternalType.STRING)
+                                .isNullable(false)
+                                .build())
+                        .build()))
+            .dataType(InternalType.RECORD)
+            .name("testSchema")
+            .build();
+
+    Map<String, String> result =
+        HudiCatalogTableUtils.getSparkTableProperties(
+            partitionNames, sparkVersion, schemaLengthThreshold, schema);
+
+    // Validate results
+    assertEquals("hudi", result.get("spark.sql.sources.provider"));
+    assertEquals("3.2.1", result.get("spark.sql.create.version"));
+    assertEquals("1", result.get("spark.sql.sources.schema.numParts"));
+    assertEquals(
+        
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"region\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}",
+        result.get("spark.sql.sources.schema.part.0"));
+    assertEquals("2", result.get("spark.sql.sources.schema.numPartCols"));
+    assertEquals("region", result.get("spark.sql.sources.schema.partCol.0"));
+    assertEquals("category", result.get("spark.sql.sources.schema.partCol.1"));
+  }
+
+  @Test
+  void testGetSparkTablePropertiesEmptyPartitions() {
+    // Setup input data with no partitions
+    List<String> partitionNames = Collections.emptyList();
+    int schemaLengthThreshold = 50;
+    InternalSchema schema =
+        InternalSchema.builder()
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("id")
+                        .schema(
+                            InternalSchema.builder()
+                                .dataType(InternalType.INT)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("name")
+                        .schema(
+                            InternalSchema.builder()
+                                .dataType(InternalType.STRING)
+                                .isNullable(false)
+                                .build())
+                        .build()))
+            .dataType(InternalType.RECORD)
+            .name("testSchema")
+            .build();
+
+    // Call the method
+    Map<String, String> result =
+        HudiCatalogTableUtils.getSparkTableProperties(
+            partitionNames, "", schemaLengthThreshold, schema);
+
+    assertEquals("hudi", result.get("spark.sql.sources.provider"));
+    assertNull(result.get("spark.sql.create.version"));
+    assertEquals("4", result.get("spark.sql.sources.schema.numParts"));
+    assertEquals(
+        
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}",
+        result.get("spark.sql.sources.schema.part.0")
+            + result.get("spark.sql.sources.schema.part.1")
+            + result.get("spark.sql.sources.schema.part.2")
+            + result.get("spark.sql.sources.schema.part.3"));
+    assertNull(result.get("spark.sql.sources.schema.numPartCols"));
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiInputFormatUtils.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiInputFormatUtils.java
new file mode 100644
index 00000000..3d28efc0
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiInputFormatUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hudi.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import org.apache.xtable.exception.NotSupportedException;
+
+public class TestHudiInputFormatUtils {
+
+  @Test
+  void testGetInputFormatClassName_Parquet() {
+    String inputFormat =
+        HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, 
false);
+    assertEquals("org.apache.hudi.hadoop.HoodieParquetInputFormat", 
inputFormat);
+
+    String realtimeInputFormat =
+        HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, 
true);
+    assertEquals(
+        "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat", 
realtimeInputFormat);
+  }
+
+  @Test
+  void testGetInputFormatClassName_HFile() {
+    String inputFormat =
+        HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.HFILE, 
false);
+    assertEquals("org.apache.hudi.hadoop.HoodieHFileInputFormat", inputFormat);
+
+    String realtimeInputFormat =
+        HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.HFILE, 
true);
+    assertEquals(
+        "org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat", 
realtimeInputFormat);
+  }
+
+  @Test
+  void testGetInputFormatClassName_Orc() {
+    String inputFormat = 
HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.ORC, false);
+    assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", 
inputFormat);
+  }
+
+  @Test
+  void testGetInputFormatClassName_UnsupportedFormat() {
+    Exception exception =
+        assertThrows(
+            NotSupportedException.class,
+            () -> 
HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.HOODIE_LOG, 
false));
+    assertTrue(
+        exception.getMessage().contains("Hudi InputFormat not implemented for 
base file format"));
+  }
+
+  @Test
+  void testGetOutputFormatClassName() {
+    String parquetOutputFormat =
+        
HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET);
+    assertEquals(
+        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", 
parquetOutputFormat);
+
+    String hfileOutputFormat =
+        HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.HFILE);
+    assertEquals(
+        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", 
hfileOutputFormat);
+
+    String orcOutputFormat = 
HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.ORC);
+    assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", 
orcOutputFormat);
+  }
+
+  @Test
+  void testGetOutputFormatClassName_UnsupportedFormat() {
+    Exception exception =
+        assertThrows(
+            NotSupportedException.class,
+            () -> 
HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.HOODIE_LOG));
+    assertTrue(exception.getMessage().contains("No OutputFormat for base file 
format"));
+  }
+
+  @Test
+  void testGetSerDeClassName() {
+    String parquetSerDe = 
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET);
+    
assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", 
parquetSerDe);
+
+    String hfileSerDe = 
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.HFILE);
+    
assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", 
hfileSerDe);
+
+    String orcSerDe = 
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.ORC);
+    assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcSerde", orcSerDe);
+  }
+
+  @Test
+  void testGetSerDeClassName_UnsupportedFormat() {
+    Exception exception =
+        assertThrows(
+            NotSupportedException.class,
+            () -> 
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.HOODIE_LOG));
+    assertTrue(exception.getMessage().contains("No SerDe for base file 
format"));
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
similarity index 73%
copy from 
xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
copy to 
xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
index 361245fe..59385f05 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.delta;
+package org.apache.xtable.schema;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -34,15 +32,14 @@ import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
 
-public class TestDeltaSchemaExtractor {
-
+public class TestSparkSchemaExtractor {
   @Test
   public void testPrimitiveTypes() {
     Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>();
     decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
     decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2);
 
-    InternalSchema internalSchema =
+    InternalSchema InternalSchemaRepresentation =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -56,7 +53,6 @@ public class TestDeltaSchemaExtractor {
                                 .name("boolean")
                                 .dataType(InternalType.BOOLEAN)
                                 .isNullable(false)
-                                .comment("requiredBooleanComment")
                                 .build())
                         .build(),
                     InternalField.builder()
@@ -227,7 +223,7 @@ public class TestDeltaSchemaExtractor {
 
     StructType structRepresentation =
         new StructType()
-            .add("requiredBoolean", DataTypes.BooleanType, false, 
"requiredBooleanComment")
+            .add("requiredBoolean", DataTypes.BooleanType, false)
             .add("optionalBoolean", DataTypes.BooleanType, true)
             .add("requiredInt", DataTypes.IntegerType, false)
             .add("optionalInt", DataTypes.IntegerType, true)
@@ -248,14 +244,12 @@ public class TestDeltaSchemaExtractor {
 
     Assertions.assertEquals(
         structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+        
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
   }
 
   @Test
   public void testFixedBytes() {
-    InternalSchema internalSchemaOriginal =
+    InternalSchema InternalSchemaRepresentationOriginal =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -269,7 +263,6 @@ public class TestDeltaSchemaExtractor {
                                 .name("fixed")
                                 .dataType(InternalType.FIXED)
                                 .isNullable(false)
-                                .comment("comment")
                                 .build())
                         .build(),
                     InternalField.builder()
@@ -284,7 +277,7 @@ public class TestDeltaSchemaExtractor {
                         .build()))
             .build();
 
-    InternalSchema internalSchemaAfterRoundTrip =
+    InternalSchema InternalSchemaRepresentationAfterRoundTrip =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -298,7 +291,6 @@ public class TestDeltaSchemaExtractor {
                                 .name("binary")
                                 .dataType(InternalType.BYTES)
                                 .isNullable(false)
-                                .comment("comment")
                                 .build())
                         .build(),
                     InternalField.builder()
@@ -314,15 +306,13 @@ public class TestDeltaSchemaExtractor {
             .build();
     StructType structRepresentation =
         new StructType()
-            .add("requiredFixed", DataTypes.BinaryType, false, "comment")
+            .add("requiredFixed", DataTypes.BinaryType, false)
             .add("optionalFixed", DataTypes.BinaryType, true);
 
     Assertions.assertEquals(
         structRepresentation,
-        
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaOriginal));
-    Assertions.assertEquals(
-        internalSchemaAfterRoundTrip,
-        
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+        SparkSchemaExtractor.getInstance()
+            .fromInternalSchema(InternalSchemaRepresentationOriginal));
   }
 
   @Test
@@ -330,7 +320,7 @@ public class TestDeltaSchemaExtractor {
     Map<InternalSchema.MetadataKey, Object> metadata =
         Collections.singletonMap(
             InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
-    InternalSchema internalSchemaTimestamp =
+    InternalSchema InternalSchemaRepresentationTimestamp =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -360,7 +350,7 @@ public class TestDeltaSchemaExtractor {
                         .build()))
             .build();
 
-    InternalSchema internalSchemaTimestampNtz =
+    InternalSchema InternalSchemaRepresentationTimestampNtz =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -400,13 +390,12 @@ public class TestDeltaSchemaExtractor {
 
     Assertions.assertEquals(
         structRepresentationTimestamp,
-        
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestamp));
-    Assertions.assertEquals(
-        internalSchemaTimestamp,
-        
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentationTimestamp));
+        SparkSchemaExtractor.getInstance()
+            .fromInternalSchema(InternalSchemaRepresentationTimestamp));
     Assertions.assertEquals(
         structRepresentationTimestampNtz,
-        
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestampNtz));
+        SparkSchemaExtractor.getInstance()
+            .fromInternalSchema(InternalSchemaRepresentationTimestampNtz));
   }
 
   @Test
@@ -417,7 +406,7 @@ public class TestDeltaSchemaExtractor {
     optionalEnumMetadata.put(
         InternalSchema.MetadataKey.ENUM_VALUES, Arrays.asList("THREE", 
"FOUR"));
 
-    InternalSchema internalSchema =
+    InternalSchema InternalSchemaRepresentation =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -454,7 +443,7 @@ public class TestDeltaSchemaExtractor {
 
     Assertions.assertEquals(
         structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
+        
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
   }
 
   @Test
@@ -488,7 +477,7 @@ public class TestDeltaSchemaExtractor {
                         .build()))
             .dataType(InternalType.RECORD)
             .build();
-    InternalSchema internalSchema =
+    InternalSchema InternalSchemaRepresentation =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -569,9 +558,7 @@ public class TestDeltaSchemaExtractor {
 
     Assertions.assertEquals(
         structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+        
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
   }
 
   @Test
@@ -605,7 +592,7 @@ public class TestDeltaSchemaExtractor {
                         .build()))
             .dataType(InternalType.RECORD)
             .build();
-    InternalSchema internalSchema =
+    InternalSchema InternalSchemaRepresentation =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -662,14 +649,12 @@ public class TestDeltaSchemaExtractor {
 
     Assertions.assertEquals(
         structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+        
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
   }
 
   @Test
   public void testNestedRecords() {
-    InternalSchema internalSchema =
+    InternalSchema InternalSchemaRepresentation =
         InternalSchema.builder()
             .name("struct")
             .dataType(InternalType.RECORD)
@@ -684,7 +669,6 @@ public class TestDeltaSchemaExtractor {
                                 .name("struct")
                                 .dataType(InternalType.RECORD)
                                 .isNullable(true)
-                                .comment("comment")
                                 .fields(
                                     Arrays.asList(
                                         InternalField.builder()
@@ -695,7 +679,6 @@ public class TestDeltaSchemaExtractor {
                                                     .name("integer")
                                                     .dataType(InternalType.INT)
                                                     .isNullable(true)
-                                                    
.comment("nestedOptionalIntComment")
                                                     .build())
                                             .defaultValue(
                                                 
InternalField.Constants.NULL_DEFAULT_VALUE)
@@ -745,204 +728,15 @@ public class TestDeltaSchemaExtractor {
             .add(
                 "nestedOne",
                 new StructType()
-                    .add(
-                        "nestedOptionalInt",
-                        DataTypes.IntegerType,
-                        true,
-                        "nestedOptionalIntComment")
+                    .add("nestedOptionalInt", DataTypes.IntegerType, true)
                     .add("nestedRequiredDouble", DataTypes.DoubleType, false)
                     .add(
                         "nestedTwo",
                         new StructType().add("doublyNestedString", 
DataTypes.StringType, true),
                         false),
-                true,
-                "comment");
-    Assertions.assertEquals(
-        structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
-  }
-
-  @Test
-  public void testFieldIdsInDeltaSchema() {
-    StructType structRepresentation =
-        new StructType()
-            .add(
-                "nestedOne",
-                new StructType()
-                    .add(
-                        "nestedOptionalInt",
-                        DataTypes.IntegerType,
-                        true,
-                        Metadata.fromJson("{\"delta.columnMapping.id\": 3}"))
-                    .add(
-                        "nestedRequiredDouble",
-                        DataTypes.DoubleType,
-                        false,
-                        Metadata.fromJson("{\"delta.columnMapping.id\": 5}"))
-                    .add(
-                        "nestedTwo",
-                        new StructType()
-                            .add(
-                                "doublyNestedString",
-                                DataTypes.StringType,
-                                true,
-                                
Metadata.fromJson("{\"delta.columnMapping.id\": 12}")),
-                        false,
-                        Metadata.fromJson("{\"delta.columnMapping.id\": 10}")),
-                true,
-                Metadata.fromJson("{\"delta.columnMapping.id\": 2}"));
-
-    InternalSchema internalSchema =
-        InternalSchema.builder()
-            .name("struct")
-            .dataType(InternalType.RECORD)
-            .isNullable(false)
-            .fields(
-                Collections.singletonList(
-                    InternalField.builder()
-                        .name("nestedOne")
-                        .fieldId(2)
-                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
-                        .schema(
-                            InternalSchema.builder()
-                                .name("struct")
-                                .dataType(InternalType.RECORD)
-                                .isNullable(true)
-                                .fields(
-                                    Arrays.asList(
-                                        InternalField.builder()
-                                            .name("nestedOptionalInt")
-                                            .fieldId(3)
-                                            .parentPath("nestedOne")
-                                            .schema(
-                                                InternalSchema.builder()
-                                                    .name("integer")
-                                                    .dataType(InternalType.INT)
-                                                    .isNullable(true)
-                                                    .build())
-                                            .defaultValue(
-                                                
InternalField.Constants.NULL_DEFAULT_VALUE)
-                                            .build(),
-                                        InternalField.builder()
-                                            .name("nestedRequiredDouble")
-                                            .fieldId(5)
-                                            .parentPath("nestedOne")
-                                            .schema(
-                                                InternalSchema.builder()
-                                                    .name("double")
-                                                    
.dataType(InternalType.DOUBLE)
-                                                    .isNullable(false)
-                                                    .build())
-                                            .build(),
-                                        InternalField.builder()
-                                            .name("nestedTwo")
-                                            .fieldId(10)
-                                            .parentPath("nestedOne")
-                                            .schema(
-                                                InternalSchema.builder()
-                                                    .name("struct")
-                                                    
.dataType(InternalType.RECORD)
-                                                    .isNullable(false)
-                                                    .fields(
-                                                        
Collections.singletonList(
-                                                            
InternalField.builder()
-                                                                
.name("doublyNestedString")
-                                                                .fieldId(12)
-                                                                
.parentPath("nestedOne.nestedTwo")
-                                                                .schema(
-                                                                    
InternalSchema.builder()
-                                                                        
.name("string")
-                                                                        
.dataType(
-                                                                            
InternalType.STRING)
-                                                                        
.isNullable(true)
-                                                                        
.build())
-                                                                .defaultValue(
-                                                                    
InternalField.Constants
-                                                                        
.NULL_DEFAULT_VALUE)
-                                                                .build()))
-                                                    .build())
-                                            .build()))
-                                .build())
-                        .build()))
-            .build();
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
-  }
-
-  @Test
-  void generateColumnsAreNotTranslatedToInternalSchema() {
-    StructType structRepresentation =
-        new StructType()
-            .add("birthDate", DataTypes.TimestampType, false)
-            .add(
-                "birthYear",
-                DataTypes.TimestampType,
-                true,
-                
Metadata.fromJson("{\"delta.generationExpression\":\"YEAR(birthDate)\"}"));
-    InternalSchema internalSchema =
-        InternalSchema.builder()
-            .dataType(InternalType.RECORD)
-            .name("struct")
-            .fields(
-                Collections.singletonList(
-                    InternalField.builder()
-                        .schema(
-                            InternalSchema.builder()
-                                .name("timestamp")
-                                .dataType(InternalType.TIMESTAMP)
-                                .metadata(
-                                    Collections.singletonMap(
-                                        
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
-                                        InternalSchema.MetadataValue.MICROS))
-                                .build())
-                        .name("birthDate")
-                        .build()))
-            .build();
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
-  }
-
-  @Test
-  public void testIcebergToDeltaUUIDSupport() {
-    Metadata metadata =
-        new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, 
"uuid").build();
-    StructType structRepresentation =
-        new StructType()
-            .add("requiredUUID", DataTypes.BinaryType, false, metadata)
-            .add("optionalUUID", DataTypes.BinaryType, true, metadata);
-    InternalSchema internalSchema =
-        InternalSchema.builder()
-            .name("struct")
-            .dataType(InternalType.RECORD)
-            .isNullable(false)
-            .fields(
-                Arrays.asList(
-                    InternalField.builder()
-                        .name("requiredUUID")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("binary")
-                                .dataType(InternalType.UUID)
-                                .isNullable(false)
-                                .build())
-                        .build(),
-                    InternalField.builder()
-                        .name("optionalUUID")
-                        .schema(
-                            InternalSchema.builder()
-                                .name("binary")
-                                .dataType(InternalType.UUID)
-                                .isNullable(true)
-                                .build())
-                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
-                        .build()))
-            .build();
+                true);
     Assertions.assertEquals(
         structRepresentation,
-        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
-    Assertions.assertEquals(
-        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+        
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
   }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java 
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
index 4505d851..75c3833a 100644
--- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.xtable.testutil;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Assertions;
@@ -148,6 +149,7 @@ public class ITTestUtils {
           .name("source_table_name")
           .basePath("file://base_path/v1/")
           .formatName("ICEBERG")
+          .additionalProperties(new Properties())
           .build();
     }
 

Reply via email to