This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new fb724f0c [590] Add interfaces for CatalogSyncClient and CatalogSync
fb724f0c is described below

commit fb724f0c8989c684259c24ebad3cefbefdc43432
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Dec 26 11:10:59 2024 -0800

    [590] Add interfaces for CatalogSyncClient and CatalogSync
---
 pom.xml                                            |  15 ++-
 rfc/template.md                                    |  55 ---------
 xtable-api/pom.xml                                 |   4 +
 .../org/apache/xtable/conversion/SourceTable.java  |   2 +-
 .../CatalogTableIdentifier.java}                   |  25 +---
 .../HierarchicalTableIdentifier.java}              |  30 ++---
 .../ThreePartHierarchicalTableIdentifier.java      | 105 +++++++++++++++++
 ...ErrorCode.java => CatalogRefreshException.java} |  23 ++--
 .../apache/xtable/model/exception/ErrorCode.java   |   3 +-
 .../org/apache/xtable/model/sync/SyncResult.java   |  28 ++++-
 .../extractor/CatalogConversionSource.java}        |  30 ++---
 .../xtable/spi/extractor/ConversionSource.java     |   9 ++
 .../org/apache/xtable/spi/sync/CatalogSync.java    | 129 +++++++++++++++++++++
 .../apache/xtable/spi/sync/CatalogSyncClient.java  |  71 ++++++++++++
 .../org/apache/xtable/spi/sync/CatalogUtils.java   |  63 ++++++++++
 .../apache/xtable/spi/sync/TableFormatSync.java    |  13 ++-
 .../TestThreePartHierarchicalTableIdentifier.java  |  50 ++++++++
 .../apache/xtable/spi/sync/TestCatalogSync.java    | 128 ++++++++++++++++++++
 .../apache/xtable/spi/sync/TestCatalogUtils.java   |  72 ++++++++++++
 .../xtable/spi/sync/TestTableFormatSync.java       |  38 +++---
 .../xtable/conversion/ConversionController.java    |   4 +-
 .../apache/xtable/conversion/ConversionUtils.java  |  57 ---------
 .../apache/xtable/delta/DeltaConversionSource.java |   7 ++
 .../apache/xtable/hudi/HudiConversionSource.java   |  13 +++
 .../xtable/iceberg/IcebergConversionSource.java    |   7 ++
 .../org/apache/xtable/ITConversionController.java  |   3 +-
 .../conversion/TestConversionController.java       |   2 +-
 .../xtable/conversion/TestConversionUtils.java     | 111 ------------------
 .../delta/ITDeltaConversionTargetSource.java       |  55 ++++++---
 .../xtable/hudi/ITHudiConversionSourceSource.java  | 125 ++++++++++++++++++++
 .../iceberg/ITIcebergConversionTargetSource.java   |  69 +++++++++++
 .../xtable/iceberg/TestIcebergDataHelper.java      |   8 ++
 .../org/apache/xtable/testutil/ITTestUtils.java    |  47 ++++++++
 .../apache/xtable/hudi/sync/XTableSyncTool.java    |   2 +-
 34 files changed, 1061 insertions(+), 342 deletions(-)

diff --git a/pom.xml b/pom.xml
index 99b4fe1a..7a597342 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
         <maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
         <maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version>
         <maven-release-plugin.version>2.5.3</maven-release-plugin.version>
+        <mockito.version>5.2.0</mockito.version>
         <parquet.version>1.12.2</parquet.version>
         <protobuf.version>3.25.5</protobuf.version>
         <scala12.version>2.12.20</scala12.version>
@@ -438,7 +439,19 @@
             <dependency>
                 <groupId>org.mockito</groupId>
                 <artifactId>mockito-core</artifactId>
-                <version>4.8.0</version>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-junit-jupiter</artifactId>
+                <scope>test</scope>
+                <version>${mockito.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-inline</artifactId>
+                <version>${mockito.version}</version>
                 <scope>test</scope>
             </dependency>
 
diff --git a/rfc/template.md b/rfc/template.md
deleted file mode 100644
index 75ab32a4..00000000
--- a/rfc/template.md
+++ /dev/null
@@ -1,55 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-# RFC-[number]: [Title]
-
-## Proposers
-
-- @<proposer1 github username>
-- @<proposer2 github username>
-
-## Approvers
-- @<approver1 github username>
-- @<approver2 github username>
-
-## Status
-
-GH Feature Request: <link to umbrella JIRA>
-
-> Please keep the status updated in `rfc/README.md`.
-
-## Abstract
-
-Describe the problem you are trying to solve and a brief description of why 
it’s needed.
-
-## Background
-Introduce any background context which is relevant or necessary to understand 
the feature and design choices.
-
-## Implementation
-Describe the new thing you want to do in appropriate detail, how it fits into 
the project architecture.<br>
-Provide a detailed description of how you intend to implement this feature, 
this may be fairly extensive and have large subsections of its own or it may be 
a few sentences.<br>
-Use judgement to decide on how detailed the description needs to be based on 
the scope of the change. If unclear, you can ask questions in 
[email protected].
-
-## Rollout/Adoption Plan
-
-- Are there any breaking changes as part of this new feature/functionality?
-- What impact (if any) will there be on existing users?
-- If we are changing behavior how will we phase out the older behavior? When 
will we remove the existing behavior? 
-- If we need special migration tools, describe them here.
-
-## Test Plan
-
-Describe in few sentences how the RFC will be tested. How will we know that 
the implementation works as expected? How will we know nothing breaks?
\ No newline at end of file
diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml
index 9436b775..43aa7bac 100644
--- a/xtable-api/pom.xml
+++ b/xtable-api/pom.xml
@@ -84,5 +84,9 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index b37e1c1e..f3e1c359 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -28,7 +28,7 @@ import lombok.NonNull;
 @EqualsAndHashCode(callSuper = true)
 @Getter
 public class SourceTable extends ExternalTable {
-  /** The path to the data files, defaults to the metadataPath */
+  /** The path to the data files, defaults to the basePath */
   @NonNull private final String dataPath;
 
   @Builder(toBuilder = true)
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java
similarity index 63%
copy from 
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to 
xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java
index 920a95f4..7679035f 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java
@@ -16,25 +16,10 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.model.exception;
+package org.apache.xtable.model.catalog;
 
-import lombok.Getter;
-
-@Getter
-public enum ErrorCode {
-  INVALID_CONFIGURATION(10001),
-  INVALID_PARTITION_SPEC(10002),
-  INVALID_PARTITION_VALUE(10003),
-  READ_EXCEPTION(10004),
-  UPDATE_EXCEPTION(10005),
-  INVALID_SCHEMA(10006),
-  UNSUPPORTED_SCHEMA_TYPE(10007),
-  UNSUPPORTED_FEATURE(10008),
-  PARSE_EXCEPTION(10009);
-
-  private final int errorCode;
-
-  ErrorCode(int errorCode) {
-    this.errorCode = errorCode;
-  }
+/** Represents a catalog table identifier in a multi-level catalog system. */
+public interface CatalogTableIdentifier {
+  /** Returns the string identifier for the table within its catalog context */
+  String getId();
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
similarity index 60%
copy from 
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to 
xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
index 920a95f4..e6d7cf0e 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
@@ -16,25 +16,19 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.model.exception;
+package org.apache.xtable.model.catalog;
 
-import lombok.Getter;
-
-@Getter
-public enum ErrorCode {
-  INVALID_CONFIGURATION(10001),
-  INVALID_PARTITION_SPEC(10002),
-  INVALID_PARTITION_VALUE(10003),
-  READ_EXCEPTION(10004),
-  UPDATE_EXCEPTION(10005),
-  INVALID_SCHEMA(10006),
-  UNSUPPORTED_SCHEMA_TYPE(10007),
-  UNSUPPORTED_FEATURE(10008),
-  PARSE_EXCEPTION(10009);
+/**
+ * Represents a hierarchical table identifier, often including catalog, 
database (or schema), and
+ * table names. Some catalogs may omit the catalog name.
+ */
+public interface HierarchicalTableIdentifier extends CatalogTableIdentifier {
+  /** @return the catalog name if present, otherwise null */
+  String getCatalogName();
 
-  private final int errorCode;
+  /** @return the database (or schema) name; required */
+  String getDatabaseName();
 
-  ErrorCode(int errorCode) {
-    this.errorCode = errorCode;
-  }
+  /** @return the table name; required */
+  String getTableName();
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
new file mode 100644
index 00000000..2608d36a
--- /dev/null
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.catalog;
+
+import lombok.NonNull;
+import lombok.Value;
+
+/**
+ * An internal representation of a fully qualified table identifier within a 
catalog. The naming
+ * convention follows a three level hierarchy, few examples can be found below.
+ *
+ * <ul>
+ *   <li>1. catalog.database.table
+ *   <li>2. catalog.schema.table
+ *   <li>3. database.schema.table
+ * </ul>
+ *
+ * We have selected the first naming convention and will interoperate among 
other catalogs following
+ * a different convention.
+ */
+@Value
+public class ThreePartHierarchicalTableIdentifier implements 
HierarchicalTableIdentifier {
+  /**
+   * The top level hierarchy/namespace for organizing tables. Each catalog can 
have multiple
+   * databases/schemas. This is an optional field as many catalogs have a 
"default" catalog whose
+   * name varies depending on the catalogType.
+   */
+  String catalogName;
+  /**
+   * Catalogs have the ability to group tables logically, databaseName is the 
identifier for such
+   * logical classification. The alternate names for this field include 
namespace, schemaName etc.
+   */
+  @NonNull String databaseName;
+
+  /**
+   * The table name used when syncing the table to the catalog. NOTE: This 
name can be different
+   * from the table name in storage.
+   */
+  @NonNull String tableName;
+
+  public ThreePartHierarchicalTableIdentifier(
+      String catalogName, @NonNull String databaseName, @NonNull String 
tableName) {
+    this.catalogName = catalogName;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+  }
+
+  public ThreePartHierarchicalTableIdentifier(String databaseName, String 
tableName) {
+    this(null, databaseName, tableName);
+  }
+
+  /**
+   * Constructs a new {@code CatalogTableIdentifier} from a hierarchical 
string identifier.
+   *
+   * <p>The identifier is expected to be in one of the following formats:
+   *
+   * <ul>
+   *   <li>{@code database.table} (two parts, no catalog)
+   *   <li>{@code catalog.database.table} (three parts)
+   * </ul>
+   *
+   * If the identifier does not match one of these formats, an {@link 
IllegalArgumentException} is
+   * thrown.
+   *
+   * @param hierarchicalTableIdentifier The hierarchical string identifier 
(e.g.,
+   *     "myCatalog.myDatabase.myTable" or "myDatabase.myTable").
+   * @throws IllegalArgumentException If the provided string does not match a 
valid two-part or
+   *     three-part identifier.
+   */
+  public static ThreePartHierarchicalTableIdentifier 
fromDotSeparatedIdentifier(
+      String hierarchicalTableIdentifier) {
+    String[] parts = hierarchicalTableIdentifier.split("\\.");
+    if (parts.length == 2) {
+      return new ThreePartHierarchicalTableIdentifier(parts[0], parts[1]);
+    } else if (parts.length == 3) {
+      return new ThreePartHierarchicalTableIdentifier(parts[0], parts[1], 
parts[2]);
+    } else {
+      throw new IllegalArgumentException("Invalid table identifier " + 
hierarchicalTableIdentifier);
+    }
+  }
+
+  @Override
+  public String getId() {
+    if (catalogName != null && !catalogName.isEmpty()) {
+      return catalogName + "." + databaseName + "." + tableName;
+    }
+    return databaseName + "." + tableName;
+  }
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java 
b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java
similarity index 67%
copy from 
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to 
xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java
index 920a95f4..dd322292 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java
@@ -18,23 +18,14 @@
  
 package org.apache.xtable.model.exception;
 
-import lombok.Getter;
+/** Exception thrown when refresh operation (updating table format metadata) 
in catalog fails. */
+public class CatalogRefreshException extends InternalException {
 
-@Getter
-public enum ErrorCode {
-  INVALID_CONFIGURATION(10001),
-  INVALID_PARTITION_SPEC(10002),
-  INVALID_PARTITION_VALUE(10003),
-  READ_EXCEPTION(10004),
-  UPDATE_EXCEPTION(10005),
-  INVALID_SCHEMA(10006),
-  UNSUPPORTED_SCHEMA_TYPE(10007),
-  UNSUPPORTED_FEATURE(10008),
-  PARSE_EXCEPTION(10009);
-
-  private final int errorCode;
+  public CatalogRefreshException(String message, Throwable e) {
+    super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e);
+  }
 
-  ErrorCode(int errorCode) {
-    this.errorCode = errorCode;
+  public CatalogRefreshException(String message) {
+    super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message);
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java 
b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
index 920a95f4..af85c900 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
@@ -30,7 +30,8 @@ public enum ErrorCode {
   INVALID_SCHEMA(10006),
   UNSUPPORTED_SCHEMA_TYPE(10007),
   UNSUPPORTED_FEATURE(10008),
-  PARSE_EXCEPTION(10009);
+  PARSE_EXCEPTION(10009),
+  CATALOG_REFRESH_EXCEPTION(10010);
 
   private final int errorCode;
 
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java 
b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
index d158b38c..824f626c 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
@@ -20,6 +20,7 @@ package org.apache.xtable.model.sync;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.List;
 
 import lombok.Builder;
 import lombok.Value;
@@ -30,7 +31,7 @@ import lombok.Value;
  * @since 0.1
  */
 @Value
-@Builder
+@Builder(toBuilder = true)
 public class SyncResult {
   // Mode used for the sync
   SyncMode mode;
@@ -38,10 +39,12 @@ public class SyncResult {
   Instant syncStartTime;
   // Duration
   Duration syncDuration;
-  // Status of the sync
-  SyncStatus status;
+  // Status of the tableFormat sync
+  SyncStatus tableFormatSyncStatus;
   // The Sync Mode recommended for the next sync (Usually filled on an error)
   SyncMode recommendedSyncMode;
+  // The sync status for each catalog.
+  List<CatalogSyncStatus> catalogSyncStatusList;
 
   public enum SyncStatusCode {
     SUCCESS,
@@ -57,6 +60,25 @@ public class SyncResult {
         SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build();
     // Status code
     SyncStatusCode statusCode;
+    // errorDetails if any
+    ErrorDetails errorDetails;
+  }
+
+  /** Represents status for catalog sync status operation */
+  @Value
+  @Builder
+  public static class CatalogSyncStatus {
+    // A user defined unique catalog identifier.
+    String catalogId;
+    // Status code
+    SyncStatusCode statusCode;
+    // errorDetails if any
+    ErrorDetails errorDetails;
+  }
+
+  @Value
+  @Builder
+  public static class ErrorDetails {
     // error Message if any
     String errorMessage;
     // Readable description of the error
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
similarity index 57%
copy from 
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to 
xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
index 920a95f4..616f3d45 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
@@ -16,25 +16,17 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.model.exception;
+package org.apache.xtable.spi.extractor;
 
-import lombok.Getter;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 
-@Getter
-public enum ErrorCode {
-  INVALID_CONFIGURATION(10001),
-  INVALID_PARTITION_SPEC(10002),
-  INVALID_PARTITION_VALUE(10003),
-  READ_EXCEPTION(10004),
-  UPDATE_EXCEPTION(10005),
-  INVALID_SCHEMA(10006),
-  UNSUPPORTED_SCHEMA_TYPE(10007),
-  UNSUPPORTED_FEATURE(10008),
-  PARSE_EXCEPTION(10009);
-
-  private final int errorCode;
-
-  ErrorCode(int errorCode) {
-    this.errorCode = errorCode;
-  }
+/**
+ * A client for converting the table with tableIdentifier {@link 
CatalogTableIdentifier} in source
+ * catalog to SourceTable object {@link SourceTable}, can be used by 
downstream consumers for
+ * syncing it to multiple {@link org.apache.xtable.conversion.TargetTable}
+ */
+public interface CatalogConversionSource {
+  /** Returns the source table object present in the catalog. */
+  SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
index 2500454c..21f7f63f 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
@@ -41,6 +41,15 @@ public interface ConversionSource<COMMIT> extends Closeable {
    */
   InternalTable getTable(COMMIT commit);
 
+  /**
+   * Extracts the {@link InternalTable} as of latest state. This method is 
less expensive as
+   * compared to {@link ConversionSource#getCurrentSnapshot()} as it doesn't 
load the files present
+   * in the table.
+   *
+   * @return {@link InternalTable} representing the current table.
+   */
+  InternalTable getCurrentTable();
+
   /**
    * Extracts the {@link InternalSnapshot} as of latest state.
    *
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
new file mode 100644
index 00000000..e76d5906
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+/** Provides the functionality to sync metadata from InternalTable to multiple 
target catalogs */
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public SyncResult syncTable(
+      Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients, 
InternalTable table) {
+    List<CatalogSyncStatus> results = new ArrayList<>();
+    Instant startTime = Instant.now();
+    catalogSyncClients.forEach(
+        ((tableIdentifier, catalogSyncClient) -> {
+          try {
+            results.add(syncCatalog(catalogSyncClient, tableIdentifier, 
table));
+            log.info(
+                "Catalog sync is successful for table {} with format {} using 
catalogSync {}",
+                table.getBasePath(),
+                table.getTableFormat(),
+                catalogSyncClient.getClass().getName());
+          } catch (Exception e) {
+            log.error(
+                "Catalog sync failed for table {} with format {} using 
catalogSync {}",
+                table.getBasePath(),
+                table.getTableFormat(),
+                catalogSyncClient.getClass().getName());
+            results.add(
+                getCatalogSyncFailureStatus(
+                    catalogSyncClient.getCatalogId(), 
catalogSyncClient.getClass().getName(), e));
+          }
+        }));
+    return SyncResult.builder()
+        .lastInstantSynced(table.getLatestCommitTime())
+        .syncStartTime(startTime)
+        .syncDuration(Duration.between(startTime, Instant.now()))
+        .catalogSyncStatusList(results)
+        .build();
+  }
+
+  private <TABLE> CatalogSyncStatus syncCatalog(
+      CatalogSyncClient<TABLE> catalogSyncClient,
+      CatalogTableIdentifier tableIdentifier,
+      InternalTable table) {
+    if (!catalogSyncClient.hasDatabase(tableIdentifier)) {
+      catalogSyncClient.createDatabase(tableIdentifier);
+    }
+    TABLE catalogTable = catalogSyncClient.getTable(tableIdentifier);
+    String storageDescriptorLocation = 
catalogSyncClient.getStorageLocation(catalogTable);
+    if (catalogTable == null) {
+      catalogSyncClient.createTable(table, tableIdentifier);
+    } else if (hasStorageDescriptorLocationChanged(
+        storageDescriptorLocation, table.getBasePath())) {
+      // Replace table if there is a mismatch between hmsTable location and 
Xtable basePath.
+      // Possible reasons could be:
+      //  1) hms table (manually) created with a different location before and 
need to be
+      // re-created with a new basePath
+      //  2) XTable basePath changes due to migration or other reasons
+      String oldLocation =
+          StringUtils.isEmpty(storageDescriptorLocation) ? "null" : 
storageDescriptorLocation;
+      log.warn(
+          "StorageDescriptor location changed from {} to {}, re-creating 
table",
+          oldLocation,
+          table.getBasePath());
+      catalogSyncClient.createOrReplaceTable(table, tableIdentifier);
+    } else {
+      log.debug("Table metadata changed, refreshing table");
+      catalogSyncClient.refreshTable(table, catalogTable, tableIdentifier);
+    }
+    return CatalogSyncStatus.builder()
+        .catalogId(catalogSyncClient.getCatalogId())
+        .statusCode(SyncResult.SyncStatusCode.SUCCESS)
+        .build();
+  }
+
+  private CatalogSyncStatus getCatalogSyncFailureStatus(
+      String catalogId, String catalogImpl, Exception e) {
+    return CatalogSyncStatus.builder()
+        .catalogId(catalogId)
+        .statusCode(SyncResult.SyncStatusCode.ERROR)
+        .errorDetails(
+            SyncResult.ErrorDetails.builder()
+                .errorMessage(e.getMessage())
+                .errorDescription("catalogSync failed for " + catalogImpl)
+                .build())
+        .build();
+  }
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
new file mode 100644
index 00000000..62de9379
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+/**
+ * An interface for syncing {@link InternalTable} object to {@link TABLE} 
object defined by the
+ * catalog.
+ *
+ * @param <TABLE>
+ */
+public interface CatalogSyncClient<TABLE> extends AutoCloseable {
+  /**
+   * Returns the user-defined unique identifier for the catalog, allows user 
to sync table to
+   * multiple catalogs of the same name/type eg: HMS catalog with url1, HMS 
catalog with url2.
+   */
+  String getCatalogId();
+
+  /** Returns the storage location of the table synced to the catalog. */
+  String getStorageLocation(TABLE table);
+
+  /** Checks whether a database used by tableIdentifier exists in the catalog. 
*/
+  boolean hasDatabase(CatalogTableIdentifier tableIdentifier);
+
+  /** Creates a database used by tableIdentifier in the catalog. */
+  void createDatabase(CatalogTableIdentifier tableIdentifier);
+
+  /**
+   * Return the TABLE object used by the catalog implementation. Eg: HMS uses
+   * org.apache.hadoop.hive.metastore.api.Table, Glue uses
+   * software.amazon.awssdk.services.glue.model.Table etc.
+   */
+  TABLE getTable(CatalogTableIdentifier tableIdentifier);
+
+  /**
+   * Create a table in the catalog using the canonical InternalTable 
representation and
+   * tableIdentifier.
+   */
+  void createTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier);
+
+  /** Refreshes the table metadata in the catalog with tableIdentifier. */
+  void refreshTable(
+      InternalTable table, TABLE catalogTable, CatalogTableIdentifier 
tableIdentifier);
+
+  /**
+   * Tries to re-create the table in the catalog replacing state with the new 
canonical
+   * InternalTable representation and tableIdentifier.
+   */
+  void createOrReplaceTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier);
+
+  /** Drops a table from the catalog. */
+  void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier);
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java
new file mode 100644
index 00000000..cd3d6d5d
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import java.net.URI;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.xtable.model.exception.CatalogRefreshException;
+
+/** Utility methods used by CatalogSync. */
+public class CatalogUtils {
+
+  /**
+   * Returns whether the location of the table in catalog is same as the one 
currently in storage.
+   *
+   * @param storageDescriptorLocation location of the table in catalog.
+   * @param tableBasePath location of the table in source table.
+   * @return equality of both the locations.
+   */
+  static boolean hasStorageDescriptorLocationChanged(
+      String storageDescriptorLocation, String tableBasePath) {
+
+    if (StringUtils.isEmpty(storageDescriptorLocation)) {
+      return true;
+    }
+    URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri();
+    URI basePathUri = new Path(tableBasePath).toUri();
+
+    if (storageDescriptorUri.equals(basePathUri)
+        || storageDescriptorUri.getScheme() == null
+        || basePathUri.getScheme() == null
+        || storageDescriptorUri.getScheme().startsWith(basePathUri.getScheme())
+        || 
basePathUri.getScheme().startsWith(storageDescriptorUri.getScheme())) {
+      String storageDescriptorLocationIdentifier =
+          storageDescriptorUri.getAuthority() + storageDescriptorUri.getPath();
+      String tableBasePathIdentifier = basePathUri.getAuthority() + 
basePathUri.getPath();
+      return !Objects.equals(storageDescriptorLocationIdentifier, 
tableBasePathIdentifier);
+    }
+    throw new CatalogRefreshException(
+        String.format(
+            "Storage scheme has changed for table catalogStorageDescriptorUri 
%s basePathUri %s",
+            storageDescriptorUri, basePathUri));
+  }
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
index 7cd0b384..bb340669 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
@@ -166,7 +166,7 @@ public class TableFormatSync {
 
     return SyncResult.builder()
         .mode(mode)
-        .status(SyncResult.SyncStatus.SUCCESS)
+        .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
         .syncStartTime(startTime)
         .syncDuration(Duration.between(startTime, Instant.now()))
         .lastInstantSynced(tableState.getLatestCommitTime())
@@ -181,12 +181,15 @@ public class TableFormatSync {
   private SyncResult buildResultForError(SyncMode mode, Instant startTime, 
Exception e) {
     return SyncResult.builder()
         .mode(mode)
-        .status(
+        .tableFormatSyncStatus(
             SyncResult.SyncStatus.builder()
                 .statusCode(SyncResult.SyncStatusCode.ERROR)
-                .errorMessage(e.getMessage())
-                .errorDescription("Failed to sync " + mode.name())
-                .canRetryOnFailure(true)
+                .errorDetails(
+                    SyncResult.ErrorDetails.builder()
+                        .errorMessage(e.getMessage())
+                        .errorDescription("Failed to sync " + mode.name())
+                        .canRetryOnFailure(true)
+                        .build())
                 .build())
         .syncStartTime(startTime)
         .syncDuration(Duration.between(startTime, Instant.now()))
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java
 
b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java
new file mode 100644
index 00000000..5f10be0b
--- /dev/null
+++ 
b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.catalog;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestThreePartHierarchicalTableIdentifier {
+
+  @Test
+  void testGetId() {
+    ThreePartHierarchicalTableIdentifier catalogTableIdentifier =
+        ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+            "catalogName.databaseName.tableName");
+    assertEquals("catalogName.databaseName.tableName", 
catalogTableIdentifier.getId());
+  }
+
+  @Test
+  void testConstructorForHierarchicalTableIdentifier() {
+    Assertions.assertDoesNotThrow(
+        () ->
+            ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+                "catalogName.databaseName.tableName"));
+    Assertions.assertDoesNotThrow(
+        () ->
+            ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+                "databaseName.tableName"));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier("tableName"));
+  }
+}
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java
new file mode 100644
index 00000000..2f4fbdc9
--- /dev/null
+++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.sync.SyncResult;
+
+@ExtendWith(MockitoExtension.class)
+public class TestCatalogSync<TABLE> {
+
+  @Mock CatalogSyncClient<TABLE> mockClient1;
+  @Mock CatalogSyncClient<TABLE> mockClient2;
+  @Mock CatalogSyncClient<TABLE> mockClient3;
+  @Mock CatalogSyncClient<TABLE> mockClient4;
+
+  private final ThreePartHierarchicalTableIdentifier tableIdentifier1 =
+      new ThreePartHierarchicalTableIdentifier("database1", "table1");
+  private final ThreePartHierarchicalTableIdentifier tableIdentifier2 =
+      new ThreePartHierarchicalTableIdentifier("database2", "table2");
+  private final ThreePartHierarchicalTableIdentifier tableIdentifier3 =
+      new ThreePartHierarchicalTableIdentifier("database3", "table3");
+  private final ThreePartHierarchicalTableIdentifier tableIdentifier4 =
+      new ThreePartHierarchicalTableIdentifier("database4", "table4");
+
+  @Mock TABLE mockTable;
+  private final InternalTable internalTable =
+      InternalTable.builder()
+          .readSchema(InternalSchema.builder().name("test_schema").build())
+          .partitioningFields(
+              Collections.singletonList(
+                  InternalPartitionField.builder()
+                      
.sourceField(InternalField.builder().name("partition_field").build())
+                      .transformType(PartitionTransformType.VALUE)
+                      .build()))
+          .latestCommitTime(Instant.now().minus(10, ChronoUnit.MINUTES))
+          .basePath("/tmp/test")
+          .build();
+
+  @Test
+  void testSyncTable() {
+    when(mockClient1.hasDatabase(tableIdentifier1)).thenReturn(false);
+    when(mockClient2.hasDatabase(tableIdentifier2)).thenReturn(true);
+    when(mockClient3.hasDatabase(tableIdentifier3)).thenReturn(true);
+    when(mockClient4.hasDatabase(tableIdentifier4))
+        .thenThrow(new UnsupportedOperationException("No catalog impl"));
+
+    when(mockClient1.getTable(tableIdentifier1)).thenReturn(mockTable);
+    when(mockClient2.getTable(tableIdentifier2)).thenReturn(null);
+    when(mockClient3.getTable(tableIdentifier3)).thenReturn(mockTable);
+
+    
when(mockClient1.getStorageLocation(any())).thenReturn("/tmp/test_changed");
+    when(mockClient2.getStorageLocation(any())).thenReturn("/tmp/test");
+    when(mockClient3.getStorageLocation(any())).thenReturn("/tmp/test");
+
+    when(mockClient4.getCatalogId()).thenReturn("catalogId4");
+
+    Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
+        ImmutableMap.of(
+            tableIdentifier1, mockClient1,
+            tableIdentifier2, mockClient2,
+            tableIdentifier3, mockClient3,
+            tableIdentifier4, mockClient4);
+
+    List<SyncResult.CatalogSyncStatus> results =
+        CatalogSync.getInstance()
+            .syncTable(catalogSyncClients, internalTable)
+            .getCatalogSyncStatusList();
+    List<SyncResult.CatalogSyncStatus> errorStatus =
+        results.stream()
+            .filter(status -> 
status.getStatusCode().equals(SyncResult.SyncStatusCode.ERROR))
+            .collect(Collectors.toList());
+    assertEquals(SyncResult.SyncStatusCode.ERROR, 
errorStatus.get(0).getStatusCode());
+    assertEquals(
+        3,
+        results.stream()
+            .map(SyncResult.CatalogSyncStatus::getStatusCode)
+            .filter(statusCode -> 
statusCode.equals(SyncResult.SyncStatusCode.SUCCESS))
+            .count());
+
+    verify(mockClient1, times(1)).createDatabase(tableIdentifier1);
+    verify(mockClient1, times(1)).createOrReplaceTable(internalTable, 
tableIdentifier1);
+    verify(mockClient2, times(1)).createTable(eq(internalTable), 
eq(tableIdentifier2));
+    verify(mockClient3, times(1)).refreshTable(eq(internalTable), any(), 
eq(tableIdentifier3));
+  }
+}
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java
new file mode 100644
index 00000000..69575e8e
--- /dev/null
+++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.stream.Stream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.xtable.model.exception.CatalogRefreshException;
+
+public class TestCatalogUtils {
+
+  static Stream<Arguments> storageLocationTestArgs() {
+    return Stream.of(
+        Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true),
+        Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true),
+        Arguments.of("file:///var/lib/bucket/table/v1", 
"file:///var/lib/bucket/table/v2/", true),
+        Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false),
+        Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false),
+        Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false),
+        Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false),
+        Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/", 
false),
+        Arguments.of("file:///var/lib/bucket/table/v1", 
"file:///var/lib/bucket/table/v1/", false));
+  }
+
+  static Stream<Arguments> storageLocationTestArgsException() {
+    return Stream.of(
+        Arguments.of(
+            "s3://bucket/table/v1",
+            "gs://bucket/table/v1",
+            new CatalogRefreshException(
+                "Storage scheme has changed for table 
catalogStorageDescriptorUri s3://bucket/table/v1 basePathUri 
gs://bucket/table/v1")));
+  }
+
+  @ParameterizedTest
+  @MethodSource("storageLocationTestArgs")
+  void testHasStorageLocationChanged(String storageLocation, String basePath, 
boolean expected) {
+    assertEquals(expected, 
hasStorageDescriptorLocationChanged(storageLocation, basePath));
+  }
+
+  @ParameterizedTest
+  @MethodSource("storageLocationTestArgsException")
+  void testHasStorageLocationChangedException(
+      String storageLocation, String basePath, Exception exception) {
+    assertThrows(
+        exception.getClass(),
+        () -> hasStorageDescriptorLocationChanged(storageLocation, basePath),
+        exception.getMessage());
+  }
+}
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index 2a9e0588..39480f8b 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -88,7 +88,7 @@ public class TestTableFormatSync {
 
     assertEquals(2, result.size());
     SyncResult successResult = result.get(TableFormat.DELTA);
-    assertEquals(SyncResult.SyncStatus.SUCCESS, successResult.getStatus());
+    assertEquals(SyncResult.SyncStatus.SUCCESS, 
successResult.getTableFormatSyncStatus());
     assertEquals(SyncMode.FULL, successResult.getMode());
     assertEquals(startingTableState.getLatestCommitTime(), 
successResult.getLastInstantSynced());
     assertSyncResultTimes(successResult, start);
@@ -99,11 +99,14 @@ public class TestTableFormatSync {
     assertEquals(
         SyncResult.SyncStatus.builder()
             .statusCode(SyncResult.SyncStatusCode.ERROR)
-            .errorMessage("Failure")
-            .errorDescription("Failed to sync FULL")
-            .canRetryOnFailure(true)
+            .errorDetails(
+                SyncResult.ErrorDetails.builder()
+                    .errorMessage("Failure")
+                    .errorDescription("Failed to sync FULL")
+                    .canRetryOnFailure(true)
+                    .build())
             .build(),
-        failureResult.getStatus());
+        failureResult.getTableFormatSyncStatus());
 
     verifyBaseConversionTargetCalls(
         mockConversionTarget2, startingTableState, pendingCommitInstants);
@@ -168,7 +171,8 @@ public class TestTableFormatSync {
     assertEquals(
         tableChanges.get(0).getTableAsOfChange().getLatestCommitTime(),
         partialSuccessResults.get(0).getLastInstantSynced());
-    assertEquals(SyncResult.SyncStatus.SUCCESS, 
partialSuccessResults.get(0).getStatus());
+    assertEquals(
+        SyncResult.SyncStatus.SUCCESS, 
partialSuccessResults.get(0).getTableFormatSyncStatus());
     assertSyncResultTimes(partialSuccessResults.get(0), start);
 
     assertEquals(SyncMode.INCREMENTAL, partialSuccessResults.get(1).getMode());
@@ -176,11 +180,14 @@ public class TestTableFormatSync {
     assertEquals(
         SyncResult.SyncStatus.builder()
             .statusCode(SyncResult.SyncStatusCode.ERROR)
-            .errorMessage("Failure")
-            .errorDescription("Failed to sync INCREMENTAL")
-            .canRetryOnFailure(true)
+            .errorDetails(
+                SyncResult.ErrorDetails.builder()
+                    .errorMessage("Failure")
+                    .errorDescription("Failed to sync INCREMENTAL")
+                    .canRetryOnFailure(true)
+                    .build())
             .build(),
-        partialSuccessResults.get(1).getStatus());
+        partialSuccessResults.get(1).getTableFormatSyncStatus());
 
     // Assert that all 3 changes are properly synced to the other format
     List<SyncResult> successResults = result.get(TableFormat.DELTA);
@@ -190,7 +197,7 @@ public class TestTableFormatSync {
       assertEquals(
           tableChanges.get(i).getTableAsOfChange().getLatestCommitTime(),
           successResults.get(i).getLastInstantSynced());
-      assertEquals(SyncResult.SyncStatus.SUCCESS, 
successResults.get(i).getStatus());
+      assertEquals(SyncResult.SyncStatus.SUCCESS, 
successResults.get(i).getTableFormatSyncStatus());
       assertSyncResultTimes(successResults.get(i), start);
     }
 
@@ -257,7 +264,8 @@ public class TestTableFormatSync {
     assertEquals(2, conversionTarget1Results.size());
     for (SyncResult conversionTarget1Result : conversionTarget1Results) {
       assertEquals(SyncMode.INCREMENTAL, conversionTarget1Result.getMode());
-      assertEquals(SyncResult.SyncStatus.SUCCESS, 
conversionTarget1Result.getStatus());
+      assertEquals(
+          SyncResult.SyncStatus.SUCCESS, 
conversionTarget1Result.getTableFormatSyncStatus());
       assertSyncResultTimes(conversionTarget1Result, start);
     }
     assertEquals(
@@ -275,7 +283,9 @@ public class TestTableFormatSync {
       assertEquals(
           tableChanges.get(i + 1).getTableAsOfChange().getLatestCommitTime(),
           conversionTarget2Results.get(i).getLastInstantSynced());
-      assertEquals(SyncResult.SyncStatus.SUCCESS, 
conversionTarget2Results.get(i).getStatus());
+      assertEquals(
+          SyncResult.SyncStatus.SUCCESS,
+          conversionTarget2Results.get(i).getTableFormatSyncStatus());
       assertSyncResultTimes(conversionTarget2Results.get(i), start);
     }
 
@@ -330,7 +340,7 @@ public class TestTableFormatSync {
     conversionTarget2Results.forEach(
         syncResult -> {
           assertEquals(SyncMode.INCREMENTAL, syncResult.getMode());
-          assertEquals(SyncResult.SyncStatus.SUCCESS, syncResult.getStatus());
+          assertEquals(SyncResult.SyncStatus.SUCCESS, 
syncResult.getTableFormatSyncStatus());
           assertSyncResultTimes(syncResult, start);
         });
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index bc5f5e02..222652a6 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -85,7 +85,7 @@ public class ConversionController {
     if (config.getTargetTables() == null || 
config.getTargetTables().isEmpty()) {
       throw new IllegalArgumentException("Please provide at-least one format 
to sync");
     }
-    config = ConversionUtils.normalizeTargetPaths(config);
+
     try (ConversionSource<COMMIT> conversionSource =
         
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
@@ -143,7 +143,7 @@ public class ConversionController {
   private static String getFormatsWithStatusCode(
       Map<String, SyncResult> syncResultsMerged, SyncResult.SyncStatusCode 
statusCode) {
     return syncResultsMerged.entrySet().stream()
-        .filter(entry -> entry.getValue().getStatus().getStatusCode() == 
statusCode)
+        .filter(entry -> 
entry.getValue().getTableFormatSyncStatus().getStatusCode() == statusCode)
         .map(Map.Entry::getKey)
         .collect(Collectors.joining(","));
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
deleted file mode 100644
index fdeedc9b..00000000
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.xtable.conversion;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.xtable.model.storage.TableFormat;
-
-public class ConversionUtils {
-
-  /**
-   * Few table formats need the metadata to be located at the root level of 
the data files. Eg: An
-   * iceberg table generated through spark will have two directories 
basePath/data and
-   * basePath/metadata For synchronising the iceberg metadata to hudi and 
delta, they need to be
-   * present in basePath/data/.hoodie and basePath/data/_delta_log.
-   *
-   * @param config conversion config for synchronizing source and target tables
-   * @return updated table config.
-   */
-  public static ConversionConfig normalizeTargetPaths(ConversionConfig config) 
{
-    if 
(!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath())
-        && 
config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) {
-      List<TargetTable> updatedTargetTables =
-          config.getTargetTables().stream()
-              .filter(
-                  targetTable ->
-                      targetTable.getFormatName().equals(TableFormat.HUDI)
-                          || 
targetTable.getFormatName().equals(TableFormat.DELTA))
-              .map(
-                  targetTable ->
-                      targetTable.toBuilder()
-                          .basePath(config.getSourceTable().getDataPath())
-                          .build())
-              .collect(Collectors.toList());
-      return new ConversionConfig(
-          config.getSourceTable(), updatedTargetTables, config.getSyncMode());
-    }
-    return config;
-  }
-}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index a5937b02..140eb8ad 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -83,6 +83,13 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     return tableExtractor.table(deltaLog, tableName, version);
   }
 
+  @Override
+  public InternalTable getCurrentTable() {
+    DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath);
+    Snapshot snapshot = deltaLog.snapshot();
+    return getTable(snapshot.version());
+  }
+
   @Override
   public InternalSnapshot getCurrentSnapshot() {
     DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index 1b1d0bf3..02423c2d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -76,6 +76,19 @@ public class HudiConversionSource implements 
ConversionSource<HoodieInstant> {
     return tableExtractor.table(metaClient, commit);
   }
 
+  @Override
+  public InternalTable getCurrentTable() {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline completedTimeline = 
activeTimeline.filterCompletedInstants();
+    // get latest commit
+    HoodieInstant latestCommit =
+        completedTimeline
+            .lastInstant()
+            .orElseThrow(
+                () -> new ReadException("Unable to read latest commit from 
Hudi source table"));
+    return getTable(latestCommit);
+  }
+
   @Override
   public InternalSnapshot getCurrentSnapshot() {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index f96ec714..c84fb196 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -131,6 +131,13 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
         .build();
   }
 
+  @Override
+  public InternalTable getCurrentTable() {
+    Table iceTable = getSourceTable();
+    Snapshot currentSnapshot = iceTable.currentSnapshot();
+    return getTable(currentSnapshot);
+  }
+
   @Override
   public InternalSnapshot getCurrentSnapshot() {
     Table iceTable = getSourceTable();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index b5ffcdf1..3d539766 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -1013,7 +1013,8 @@ public class ITConversionController {
                     TargetTable.builder()
                         .name(tableName)
                         .formatName(formatName)
-                        .basePath(table.getBasePath())
+                        // set the metadata path to the data path as the 
default (required by Hudi)
+                        .basePath(table.getDataPath())
                         .metadataRetention(metadataRetention)
                         .build())
             .collect(Collectors.toList());
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 652bbe42..caba8046 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -392,7 +392,7 @@ public class TestConversionController {
     return SyncResult.builder()
         .mode(syncMode)
         .lastInstantSynced(lastSyncedInstant)
-        .status(SyncResult.SyncStatus.SUCCESS)
+        .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
         .build();
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
deleted file mode 100644
index b1044039..00000000
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.xtable.conversion;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.util.Arrays;
-
-import org.junit.jupiter.api.Test;
-
-import org.apache.xtable.model.storage.TableFormat;
-import org.apache.xtable.model.sync.SyncMode;
-
-class TestConversionUtils {
-
-  @Test
-  void testNormalizeTargetPaths() {
-    ConversionConfig config =
-        ConversionConfig.builder()
-            .sourceTable(
-                SourceTable.builder()
-                    .name("table_name")
-                    .formatName(TableFormat.ICEBERG)
-                    .basePath("/tmp/basePath")
-                    .dataPath("/tmp/basePath/data")
-                    .build())
-            .syncMode(SyncMode.FULL)
-            .targetTables(
-                Arrays.asList(
-                    TargetTable.builder()
-                        .name("table_name")
-                        .basePath("/tmp/basePath")
-                        .formatName(TableFormat.DELTA)
-                        .build(),
-                    TargetTable.builder()
-                        .name("table_name")
-                        .basePath("/tmp/basePath")
-                        .formatName(TableFormat.HUDI)
-                        .build()))
-            .build();
-    ConversionConfig expectedNormalizedConfig =
-        ConversionConfig.builder()
-            .sourceTable(
-                SourceTable.builder()
-                    .name("table_name")
-                    .formatName(TableFormat.ICEBERG)
-                    .basePath("/tmp/basePath")
-                    .dataPath("/tmp/basePath/data")
-                    .build())
-            .syncMode(SyncMode.FULL)
-            .targetTables(
-                Arrays.asList(
-                    TargetTable.builder()
-                        .name("table_name")
-                        .basePath("/tmp/basePath/data")
-                        .formatName(TableFormat.DELTA)
-                        .build(),
-                    TargetTable.builder()
-                        .name("table_name")
-                        .basePath("/tmp/basePath/data")
-                        .formatName(TableFormat.HUDI)
-                        .build()))
-            .build();
-    ConversionConfig actualConfig = 
ConversionUtils.normalizeTargetPaths(config);
-    assertEquals(expectedNormalizedConfig, actualConfig);
-  }
-
-  @Test
-  void testNormalizeTargetPathsNoOp() {
-    ConversionConfig config =
-        ConversionConfig.builder()
-            .sourceTable(
-                SourceTable.builder()
-                    .name("table_name")
-                    .formatName(TableFormat.HUDI)
-                    .basePath("/tmp/basePath")
-                    .build())
-            .syncMode(SyncMode.FULL)
-            .targetTables(
-                Arrays.asList(
-                    TargetTable.builder()
-                        .name("table_name")
-                        .basePath("/tmp/basePath")
-                        .formatName(TableFormat.ICEBERG)
-                        .build(),
-                    TargetTable.builder()
-                        .name("table_name")
-                        .basePath("/tmp/basePath")
-                        .formatName(TableFormat.DELTA)
-                        .build()))
-            .build();
-    ConversionConfig actualConfig = 
ConversionUtils.normalizeTargetPaths(config);
-    assertEquals(config, actualConfig);
-  }
-}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
index 8fcf0753..ca8bc3fa 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
@@ -18,6 +18,7 @@
  
 package org.apache.xtable.delta;
 
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
 import static org.junit.jupiter.api.Assertions.*;
 
 import java.net.URI;
@@ -208,6 +209,44 @@ public class ITDeltaConversionTargetSource {
         snapshot.getPartitionedDataFiles().get(0));
   }
 
+  @Test
+  void getCurrentTableTest() {
+    // Table name
+    final String tableName = GenericTable.getTableName();
+    final Path basePath = tempDir.resolve(tableName);
+    // Create table with a single row using Spark
+    sparkSession.sql(
+        "CREATE TABLE `"
+            + tableName
+            + "` USING DELTA LOCATION '"
+            + basePath
+            + "' AS SELECT * FROM VALUES (1, 2)");
+    // Create Delta source
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+    // Get current table
+    InternalTable internalTable = conversionSource.getCurrentTable();
+    List<InternalField> fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD);
+    validateTable(
+        internalTable,
+        tableName,
+        TableFormat.DELTA,
+        InternalSchema.builder()
+            .name("struct")
+            .dataType(InternalType.RECORD)
+            .fields(fields)
+            .build(),
+        DataLayoutStrategy.FLAT,
+        "file:" + basePath,
+        Collections.emptyList());
+  }
+
   @Test
   void getCurrentSnapshotPartitionedTest() throws URISyntaxException {
     // Table name
@@ -660,22 +699,6 @@ public class ITDeltaConversionTargetSource {
     assertEquals(PartitionTransformType.YEAR, 
partitionField.getTransformType());
   }
 
-  private static void validateTable(
-      InternalTable internalTable,
-      String tableName,
-      String tableFormat,
-      InternalSchema readSchema,
-      DataLayoutStrategy dataLayoutStrategy,
-      String basePath,
-      List<InternalPartitionField> partitioningFields) {
-    Assertions.assertEquals(tableName, internalTable.getName());
-    Assertions.assertEquals(tableFormat, internalTable.getTableFormat());
-    Assertions.assertEquals(readSchema, internalTable.getReadSchema());
-    Assertions.assertEquals(dataLayoutStrategy, 
internalTable.getLayoutStrategy());
-    Assertions.assertEquals(basePath, internalTable.getBasePath());
-    Assertions.assertEquals(partitioningFields, 
internalTable.getPartitioningFields());
-  }
-
   private void validatePartitionDataFiles(
       PartitionFileGroup expectedPartitionFiles, PartitionFileGroup 
actualPartitionFiles)
       throws URISyntaxException {
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
index c074bc23..3debf904 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
@@ -19,6 +19,7 @@
 package org.apache.xtable.hudi;
 
 import static java.util.stream.Collectors.groupingBy;
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
 import static org.junit.jupiter.api.Assertions.*;
 
 import java.nio.file.Path;
@@ -26,6 +27,7 @@ import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +40,7 @@ import java.util.stream.Stream;
 import lombok.Builder;
 import lombok.Value;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -65,7 +68,13 @@ import org.apache.xtable.ValidationTestHelper;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.TableFormat;
 
 /**
  * A suite of functional tests that the extraction from Hudi to Intermediate 
representation works.
@@ -98,6 +107,122 @@ public class ITHudiConversionSourceSource {
     }
   }
 
+  @Test
+  void getCurrentTableTest() {
+    String tableName = GenericTable.getTableName();
+    Path basePath = tempDir.resolve(tableName);
+    HudiTestUtil.PartitionConfig partitionConfig = 
HudiTestUtil.PartitionConfig.of(null, null);
+    Schema schema =
+        Schema.createRecord(
+            "testCurrentTable",
+            null,
+            "hudi",
+            false,
+            Arrays.asList(
+                new Schema.Field("field1", Schema.create(Schema.Type.STRING)),
+                new Schema.Field("field2", 
Schema.create(Schema.Type.STRING))));
+    try (TestJavaHudiTable table =
+        TestJavaHudiTable.withSchema(
+            tableName,
+            tempDir,
+            HudiTestUtil.PartitionConfig.of(null, null).getHudiConfig(),
+            HoodieTableType.MERGE_ON_READ,
+            schema)) {
+      table.insertRecords(5, Collections.emptyList(), false);
+      HudiConversionSource hudiClient =
+          getHudiSourceClient(
+              CONFIGURATION, table.getBasePath(), 
partitionConfig.getXTableConfig());
+      InternalTable internalTable = hudiClient.getCurrentTable();
+      InternalSchema internalSchema =
+          InternalSchema.builder()
+              .name("testCurrentTable")
+              .dataType(InternalType.RECORD)
+              .isNullable(false)
+              .fields(
+                  Arrays.asList(
+                      InternalField.builder()
+                          .name("_hoodie_commit_time")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build(),
+                      InternalField.builder()
+                          .name("_hoodie_commit_seqno")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build(),
+                      InternalField.builder()
+                          .name("_hoodie_record_key")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build(),
+                      InternalField.builder()
+                          .name("_hoodie_partition_path")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build(),
+                      InternalField.builder()
+                          .name("_hoodie_file_name")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build(),
+                      InternalField.builder()
+                          .name("field1")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(false)
+                                  .build())
+                          .defaultValue(null)
+                          .build(),
+                      InternalField.builder()
+                          .name("field2")
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(false)
+                                  .build())
+                          .defaultValue(null)
+                          .build()))
+              .recordKeyFields(Collections.singletonList(null))
+              .build();
+      validateTable(
+          internalTable,
+          tableName,
+          TableFormat.HUDI,
+          internalSchema,
+          DataLayoutStrategy.FLAT,
+          "file:" + basePath + "_v1",
+          Collections.emptyList());
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("testsForAllTableTypesAndPartitions")
   public void insertAndUpsertData(
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
index 3f20ac9d..acd88688 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
@@ -21,6 +21,7 @@ package org.apache.xtable.iceberg;
 import static org.apache.xtable.GenericTable.getTableName;
 import static org.apache.xtable.ValidationTestHelper.validateSnapshot;
 import static org.apache.xtable.ValidationTestHelper.validateTableChanges;
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,6 +30,8 @@ import java.nio.file.Path;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -49,9 +52,14 @@ import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.TableFormat;
 
 public class ITIcebergConversionTargetSource {
@@ -66,6 +74,67 @@ public class ITIcebergConversionTargetSource {
     sourceProvider.init(hadoopConf);
   }
 
+  @Test
+  void getCurrentTableTest() {
+    String tableName = getTableName();
+    try (TestIcebergTable testIcebergTable =
+        new TestIcebergTable(
+            tableName,
+            tempDir,
+            hadoopConf,
+            "field1",
+            Collections.singletonList(null),
+            TestIcebergDataHelper.SchemaType.BASIC)) {
+      testIcebergTable.insertRows(50);
+      SourceTable tableConfig =
+          SourceTable.builder()
+              .name(testIcebergTable.getTableName())
+              .basePath(testIcebergTable.getBasePath())
+              .formatName(TableFormat.ICEBERG)
+              .build();
+      IcebergConversionSource conversionSource =
+          sourceProvider.getConversionSourceInstance(tableConfig);
+      InternalTable internalTable = conversionSource.getCurrentTable();
+      InternalSchema internalSchema =
+          InternalSchema.builder()
+              .name("record")
+              .dataType(InternalType.RECORD)
+              .fields(
+                  Arrays.asList(
+                      InternalField.builder()
+                          .name("field1")
+                          .fieldId(1)
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build(),
+                      InternalField.builder()
+                          .name("field2")
+                          .fieldId(2)
+                          .schema(
+                              InternalSchema.builder()
+                                  .name("string")
+                                  .dataType(InternalType.STRING)
+                                  .isNullable(true)
+                                  .build())
+                          
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                          .build()))
+              .build();
+      validateTable(
+          internalTable,
+          testIcebergTable.getBasePath(),
+          TableFormat.ICEBERG,
+          internalSchema,
+          DataLayoutStrategy.FLAT,
+          testIcebergTable.getBasePath(),
+          Collections.emptyList());
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
index 1d10fe7a..d90ba169 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
@@ -56,6 +56,10 @@ import org.apache.iceberg.types.Types.NestedField;
 @Value
 public class TestIcebergDataHelper {
   private static final Random RANDOM = new Random();
+  private static final List<Types.NestedField> BASIC_FIELDS =
+      Arrays.asList(
+          NestedField.optional(1, "field1", Types.StringType.get()),
+          NestedField.optional(2, "field2", Types.StringType.get()));
   private static final List<Types.NestedField> COMMON_FIELDS =
       Arrays.asList(
           NestedField.optional(1, "id", Types.StringType.get()),
@@ -114,6 +118,7 @@ public class TestIcebergDataHelper {
   private static final Schema SCHEMA_WITH_UUID_COLUMN =
       new Schema(
           Stream.concat(COMMON_FIELDS.stream(), 
UUID_FIELDS.stream()).collect(Collectors.toList()));
+  private static final Schema BASIC_SCHEMA = new Schema(BASIC_FIELDS);
   private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
   private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
@@ -122,6 +127,7 @@ public class TestIcebergDataHelper {
   List<String> partitionFieldNames;
 
   public static enum SchemaType {
+    BASIC,
     COMMON,
     COMMON_WITH_ADDITIONAL_COLUMNS,
     COMMON_WITH_UUID_COLUMN,
@@ -145,6 +151,8 @@ public class TestIcebergDataHelper {
         return SCHEMA_WITH_ADDITIONAL_COLUMNS;
       case COMMON_WITH_UUID_COLUMN:
         return SCHEMA_WITH_UUID_COLUMN;
+      case BASIC:
+        return BASIC_SCHEMA;
       default:
         throw new IllegalArgumentException("Unknown schema type: " + 
schemaType);
     }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java 
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
new file mode 100644
index 00000000..281e61fe
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.testutil;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+
+public class ITTestUtils {
+
+  public static void validateTable(
+      InternalTable internalTable,
+      String tableName,
+      String tableFormat,
+      InternalSchema readSchema,
+      DataLayoutStrategy dataLayoutStrategy,
+      String basePath,
+      List<InternalPartitionField> partitioningFields) {
+    Assertions.assertEquals(tableName, internalTable.getName());
+    Assertions.assertEquals(tableFormat, internalTable.getTableFormat());
+    Assertions.assertEquals(readSchema, internalTable.getReadSchema());
+    Assertions.assertEquals(dataLayoutStrategy, 
internalTable.getLayoutStrategy());
+    Assertions.assertEquals(basePath, internalTable.getBasePath());
+    Assertions.assertEquals(partitioningFields, 
internalTable.getPartitioningFields());
+  }
+}
diff --git 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
index a9653eb5..7491cec4 100644
--- 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
+++ 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
@@ -106,7 +106,7 @@ public class XTableSyncTool extends HoodieSyncTool {
         results.entrySet().stream()
             .filter(
                 entry ->
-                    entry.getValue().getStatus().getStatusCode()
+                    entry.getValue().getTableFormatSyncStatus().getStatusCode()
                         != SyncResult.SyncStatusCode.SUCCESS)
             .map(Map.Entry::getKey)
             .collect(Collectors.joining(","));

Reply via email to