This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch 590-CatalogSync-CoreLibrary
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit 96daddef834bf8869c057f3799a1ecf1c3c5e49d
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Dec 10 00:25:56 2024 -0800

    Add syncCatalogs in ConversionController
---
 .../apache/xtable/conversion/ConversionConfig.java |  9 +++-
 .../org/apache/xtable/model/sync/SyncResult.java   |  2 +-
 .../xtable/spi/extractor/ConversionSource.java     |  7 +++
 .../org/apache/xtable/spi/sync/CatalogSync.java    |  6 +--
 .../xtable/catalog/ExternalCatalogConfig.java      | 34 ++++++---------
 .../conversion/CatalogSyncClientFactory.java       | 51 ++++++++++++++++++++++
 .../xtable/conversion/ConversionController.java    | 51 +++++++++++++++++++++-
 .../apache/xtable/delta/DeltaConversionSource.java |  7 +++
 .../apache/xtable/hudi/HudiConversionSource.java   | 13 ++++++
 .../xtable/iceberg/IcebergConversionSource.java    |  7 +++
 .../conversion/TestConversionController.java       | 39 ++++++++++++++---
 .../java/org/apache/xtable/utilities/RunSync.java  | 48 ++++++++++++++++++++
 12 files changed, 242 insertions(+), 32 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 73e2628d..cab6bf1e 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -18,6 +18,7 @@
  
 package org.apache.xtable.conversion;
 
+import java.util.Collections;
 import java.util.List;
 
 import lombok.Builder;
@@ -36,15 +37,21 @@ public class ConversionConfig {
   List<TargetTable> targetTables;
   // The mode, incremental or snapshot
   SyncMode syncMode;
+  // One or more external catalogs to sync the table metadata to
+  List<ExternalCatalog> externalCatalogs;
 
   @Builder
   ConversionConfig(
-      @NonNull SourceTable sourceTable, List<TargetTable> targetTables, 
SyncMode syncMode) {
+      @NonNull SourceTable sourceTable,
+      List<TargetTable> targetTables,
+      SyncMode syncMode,
+      List<ExternalCatalog> externalCatalogs) {
     this.sourceTable = sourceTable;
     this.targetTables = targetTables;
     Preconditions.checkArgument(
         targetTables != null && !targetTables.isEmpty(),
         "Please provide at-least one format to sync");
     this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
+    this.externalCatalogs = externalCatalogs == null ? Collections.emptyList() 
: externalCatalogs;
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java 
b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
index ef06b9ca..1d7ee8db 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
@@ -31,7 +31,7 @@ import lombok.Value;
  * @since 0.1
  */
 @Value
-@Builder
+@Builder(toBuilder = true)
 public class SyncResult {
   // Mode used for the sync
   SyncMode mode;
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
index 2500454c..6baef45d 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
@@ -48,6 +48,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
    */
   InternalSnapshot getCurrentSnapshot();
 
+  /**
+   * Extracts the {@link InternalTable} as of latest state.
+   *
+   * @return {@link InternalTable} representing the current state.
+   */
+  InternalTable getCurrentTable();
+
   /**
    * Extracts a {@link TableChange} for the provided commit.
    *
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
index 1b4dcb0c..601068f9 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
@@ -46,10 +46,10 @@ public class CatalogSync {
     return INSTANCE;
   }
 
-  public <TABLE> Map<String, List<CatalogSyncStatus>> syncTable(
-      Collection<CatalogSyncClient<TABLE>> catalogSyncClients, InternalTable 
table) {
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncClient> catalogSyncClients, InternalTable table) {
     Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
-    for (CatalogSyncClient<TABLE> catalogSyncClient : catalogSyncClients) {
+    for (CatalogSyncClient<?> catalogSyncClient : catalogSyncClients) {
       results.computeIfAbsent(catalogSyncClient.getTableFormat(), k -> new 
ArrayList<>());
       try {
         results
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
similarity index 52%
copy from 
xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
copy to 
xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
index 73e2628d..410a6e38 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
@@ -16,35 +16,27 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.conversion;
+package org.apache.xtable.catalog;
 
-import java.util.List;
+import java.util.Collections;
+import java.util.Map;
 
 import lombok.Builder;
 import lombok.NonNull;
 import lombok.Value;
 
-import com.google.common.base.Preconditions;
-
-import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.conversion.CatalogConfig;
 
 @Value
-public class ConversionConfig {
-  // The source of the sync
-  @NonNull SourceTable sourceTable;
-  // One or more targets to sync the table metadata to
-  List<TargetTable> targetTables;
-  // The mode, incremental or snapshot
-  SyncMode syncMode;
+@Builder
+public class ExternalCatalogConfig implements CatalogConfig {
+  @NonNull String catalogImpl;
+  @NonNull String catalogName;
+  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
 
-  @Builder
-  ConversionConfig(
-      @NonNull SourceTable sourceTable, List<TargetTable> targetTables, 
SyncMode syncMode) {
-    this.sourceTable = sourceTable;
-    this.targetTables = targetTables;
-    Preconditions.checkArgument(
-        targetTables != null && !targetTables.isEmpty(),
-        "Please provide at-least one format to sync");
-    this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
+  public static ExternalCatalogConfig fromCatalogType(
+      String catalogType, String catalogName, Map<String, String> 
catalogOptions) {
+    // TODO: Return ExternalCatalogConfig based on catalogType - HMS, Glue etc.
+    return ExternalCatalogConfig.builder().build();
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/CatalogSyncClientFactory.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/CatalogSyncClientFactory.java
new file mode 100644
index 00000000..249613f0
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/CatalogSyncClientFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+public class CatalogSyncClientFactory {
+  private static final CatalogSyncClientFactory INSTANCE = new 
CatalogSyncClientFactory();
+
+  public static CatalogSyncClientFactory getInstance() {
+    return INSTANCE;
+  }
+
+  public List<CatalogSyncClient<?>> createForCatalog(
+      ExternalCatalog externalCatalog, Configuration configuration) {
+    return externalCatalog.getTableFormatsToSync().keySet().stream()
+        .map(tableFormat -> createForTableFormat(externalCatalog, tableFormat, 
configuration))
+        .collect(Collectors.toList());
+  }
+
+  private CatalogSyncClient<?> createForTableFormat(
+      ExternalCatalog externalCatalog, String tableFormat, Configuration 
configuration) {
+    return ReflectionUtils.createInstanceOfClass(
+        externalCatalog.getCatalogConfig().getCatalogImpl(),
+        externalCatalog,
+        tableFormat,
+        configuration);
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index dc665969..0d4c215b 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -44,8 +44,11 @@ import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
 import org.apache.xtable.spi.extractor.ConversionSource;
 import org.apache.xtable.spi.extractor.ExtractFromSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -64,10 +67,17 @@ import org.apache.xtable.spi.sync.TableFormatSync;
 public class ConversionController {
   private final Configuration conf;
   private final ConversionTargetFactory conversionTargetFactory;
+  private final CatalogSyncClientFactory catalogSyncClientFactory;
   private final TableFormatSync tableFormatSync;
+  private final CatalogSync catalogSync;
 
   public ConversionController(Configuration conf) {
-    this(conf, ConversionTargetFactory.getInstance(), 
TableFormatSync.getInstance());
+    this(
+        conf,
+        ConversionTargetFactory.getInstance(),
+        CatalogSyncClientFactory.getInstance(),
+        TableFormatSync.getInstance(),
+        CatalogSync.getInstance());
   }
 
   /**
@@ -134,6 +144,10 @@ public class ConversionController {
       if (!failedSyncs.isEmpty()) {
         log.error("Sync failed for the following formats {}", failedSyncs);
       }
+      // Sync to catalogs.
+      Map<String, List<CatalogSyncStatus>> catalogSyncResults =
+          syncCatalogs(config, conversionSourceProvider);
+      mergeSyncResults(syncResultsMerged, catalogSyncResults);
       return syncResultsMerged;
     } catch (IOException ioException) {
       throw new ReadException("Failed to close source converter", ioException);
@@ -211,6 +225,22 @@ public class ConversionController {
     return 
SyncResultForTableFormats.builder().lastSyncResult(syncResultsByFormat).build();
   }
 
+  private <COMMIT> Map<String, List<CatalogSyncStatus>> syncCatalogs(
+      ConversionConfig config, ConversionSourceProvider<COMMIT> 
conversionSourceProvider) {
+    List<CatalogSyncClient> catalogSyncClients =
+        config.getExternalCatalogs().stream()
+            .flatMap(
+                externalCatalog ->
+                    catalogSyncClientFactory.createForCatalog(externalCatalog, 
conf).stream())
+            .collect(Collectors.toList());
+    try (ConversionSource<COMMIT> conversionSource =
+        
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
+      return catalogSync.syncTable(catalogSyncClients, 
conversionSource.getCurrentTable());
+    } catch (IOException e) {
+      throw new ReadException("Failed to close source converter during 
syncCatalogs", e);
+    }
+  }
+
   /**
    * Checks if incremental sync is sufficient for a target table format.
    *
@@ -268,6 +298,25 @@ public class ConversionController {
         .build();
   }
 
+  private void mergeSyncResults(
+      Map<String, SyncResult> syncResultsMerged,
+      Map<String, List<CatalogSyncStatus>> catalogSyncResults) {
+    catalogSyncResults.forEach(
+        (tableFormat, results) -> {
+          syncResultsMerged.computeIfPresent(
+              tableFormat,
+              (k, syncResult) -> 
syncResult.toBuilder().catalogSyncStatusList(results).build());
+          syncResultsMerged.computeIfAbsent(
+              tableFormat,
+              k ->
+                  SyncResult.builder()
+                      .mode(SyncMode.INCREMENTAL)
+                      .status(SyncResult.SyncStatus.SUCCESS)
+                      .catalogSyncStatusList(results)
+                      .build());
+        });
+  }
+
   @Value
   @Builder
   private static class SyncResultForTableFormats {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 19ecc02c..28e9ef2c 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -92,6 +92,13 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
         .build();
   }
 
+  @Override
+  public InternalTable getCurrentTable() {
+    DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath);
+    Snapshot snapshot = deltaLog.snapshot();
+    return getTable(snapshot.version());
+  }
+
   @Override
   public TableChange getTableChangeForCommit(Long versionNumber) {
     InternalTable tableAtVersion = tableExtractor.table(deltaLog, tableName, 
versionNumber);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index 1b1d0bf3..2993776c 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -104,6 +104,19 @@ public class HudiConversionSource implements 
ConversionSource<HoodieInstant> {
         .build();
   }
 
+  @Override
+  public InternalTable getCurrentTable() {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline completedTimeline = 
activeTimeline.filterCompletedInstants();
+    // get latest commit
+    HoodieInstant latestCommit =
+        completedTimeline
+            .lastInstant()
+            .orElseThrow(
+                () -> new ReadException("Unable to read latest commit from 
Hudi source table"));
+    return getTable(latestCommit);
+  }
+
   @Override
   public TableChange getTableChangeForCommit(HoodieInstant 
hoodieInstantForDiff) {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index f96ec714..2d049d55 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -160,6 +160,13 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
         .build();
   }
 
+  @Override
+  public InternalTable getCurrentTable() {
+    Table iceTable = getSourceTable();
+    Snapshot currentSnapshot = iceTable.currentSnapshot();
+    return getTable(currentSnapshot);
+  }
+
   private InternalDataFile fromIceberg(
       DataFile file, PartitionSpec partitionSpec, InternalTable internalTable) 
{
     List<PartitionValue> partitionValues =
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 652bbe42..e1bcb99a 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -54,6 +54,7 @@ import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
 import org.apache.xtable.spi.extractor.ConversionSource;
+import org.apache.xtable.spi.sync.CatalogSync;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -65,7 +66,10 @@ public class TestConversionController {
   private final ConversionSource<Instant> mockConversionSource = 
mock(ConversionSource.class);
   private final ConversionTargetFactory mockConversionTargetFactory =
       mock(ConversionTargetFactory.class);
+  private final CatalogSyncClientFactory mockCatalogSyncClientFactory =
+      mock(CatalogSyncClientFactory.class);
   private final TableFormatSync tableFormatSync = mock(TableFormatSync.class);
+  private final CatalogSync catalogSync = mock(CatalogSync.class);
   private final ConversionTarget mockConversionTarget1 = 
mock(ConversionTarget.class);
   private final ConversionTarget mockConversionTarget2 = 
mock(ConversionTarget.class);
 
@@ -96,7 +100,12 @@ public class TestConversionController {
             eq(internalSnapshot)))
         .thenReturn(perTableResults);
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogSyncClientFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(perTableResults, result);
@@ -182,7 +191,12 @@ public class TestConversionController {
     expectedSyncResult.put(TableFormat.ICEBERG, 
getLastSyncResult(icebergSyncResults));
     expectedSyncResult.put(TableFormat.DELTA, 
getLastSyncResult(deltaSyncResults));
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogSyncClientFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
@@ -226,7 +240,12 @@ public class TestConversionController {
             eq(internalSnapshot)))
         .thenReturn(syncResults);
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogSyncClientFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(syncResults, result);
@@ -310,7 +329,12 @@ public class TestConversionController {
     expectedSyncResult.put(TableFormat.ICEBERG, syncResult);
     expectedSyncResult.put(TableFormat.DELTA, 
getLastSyncResult(deltaSyncResults));
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogSyncClientFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
@@ -368,7 +392,12 @@ public class TestConversionController {
     // Iceberg and Delta have no commits to sync
     Map<String, SyncResult> expectedSyncResult = Collections.emptyMap();
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogSyncClientFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index c84753de..b3a352e8 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -37,6 +37,7 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import com.fasterxml.jackson.annotation.JsonMerge;
@@ -46,9 +47,12 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.xtable.catalog.ExternalCatalogConfig;
 import org.apache.xtable.conversion.ConversionConfig;
 import org.apache.xtable.conversion.ConversionController;
 import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier;
 import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.hudi.HudiSourceConfig;
@@ -69,6 +73,7 @@ public class RunSync {
   private static final String HADOOP_CONFIG_PATH = "p";
   private static final String CONVERTERS_CONFIG_PATH = "c";
   private static final String ICEBERG_CATALOG_CONFIG_PATH = "i";
+  private static final String EXTERNAL_CATALOG_CONFIG_PATH = "e";
   private static final String HELP_OPTION = "h";
 
   private static final Options OPTIONS =
@@ -181,11 +186,38 @@ public class RunSync {
                           .build())
               .collect(Collectors.toList());
 
+      List<ExternalCatalog> externalCatalogs =
+          datasetConfig.getCatalogs().stream()
+              .map(
+                  catalog -> {
+                    ExternalCatalog.ExternalCatalogBuilder builder =
+                        ExternalCatalog.builder()
+                            .catalogIdentifier(catalog.getCatalogIdentifier())
+                            
.tableFormatsToSync(catalog.getCatalogTableIdentifiers());
+                    if (StringUtils.isEmpty(catalog.getCatalogImplClass())) {
+                      builder.catalogConfig(
+                          ExternalCatalogConfig.builder()
+                              .catalogImpl(catalog.getCatalogImplClass())
+                              .catalogOptions(catalog.getCatalogProperties())
+                              .catalogName(catalog.getCatalogIdentifier())
+                              .build());
+                    } else {
+                      builder.catalogConfig(
+                          ExternalCatalogConfig.fromCatalogType(
+                              catalog.getCatalogType(),
+                              catalog.getCatalogIdentifier(),
+                              catalog.getCatalogProperties()));
+                    }
+                    return builder.build();
+                  })
+              .collect(Collectors.toList());
+
       ConversionConfig conversionConfig =
           ConversionConfig.builder()
               .sourceTable(sourceTable)
               .targetTables(targetTables)
               .syncMode(SyncMode.INCREMENTAL)
+              .externalCatalogs(externalCatalogs)
               .build();
       try {
         conversionController.sync(conversionConfig, conversionSourceProvider);
@@ -258,6 +290,9 @@ public class RunSync {
     /** Configuration of the dataset to sync, path, table name, etc. */
     List<Table> datasets;
 
+    /** List of catalogs to sync */
+    List<Catalog> catalogs;
+
     @Data
     public static class Table {
       /**
@@ -272,6 +307,19 @@ public class RunSync {
       String partitionSpec;
       String namespace;
     }
+
+    @Data
+    public static class Catalog {
+      String catalogIdentifier;
+
+      String catalogImplClass;
+
+      String catalogType;
+
+      Map<String, String> catalogProperties;
+
+      Map<String, TableIdentifier> catalogTableIdentifiers;
+    }
   }
 
   @Data

Reply via email to