This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch 590-CatalogSync
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/590-CatalogSync by this push:
     new fce3524d Add tests - Part2
fce3524d is described below

commit fce3524d5fed70b5c50fd0b2437d4a8e0538c974
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Dec 12 17:56:34 2024 -0800

    Add tests - Part2
---
 .../apache/xtable/conversion/ExternalTable.java    |   6 -
 .../org/apache/xtable/conversion/SourceTable.java  |   6 +-
 .../org/apache/xtable/conversion/TargetTable.java  |   3 +-
 .../xtable/conversion/TestExternalTable.java       |  12 +-
 .../xtable/catalog/CatalogConversionFactory.java   |   2 -
 .../xtable/catalog/ExternalCatalogConfig.java      |   7 +-
 .../xtable/conversion/ConversionController.java    |  13 ++-
 .../apache/xtable/conversion/ConversionUtils.java  |  16 ++-
 .../conversion/TestConversionController.java       | 122 +++++++++++++++++++++
 .../apache/xtable/utilities/RunCatalogSync.java    |  30 ++++-
 .../xtable/utilities/TestRunCatalogSync.java       |  99 ++++++++++++++++-
 .../{main => test}/resources/catalogConfig.yaml    |  23 ++--
 12 files changed, 294 insertions(+), 45 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index 0dda90f3..939c59c0 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -20,8 +20,6 @@ package org.apache.xtable.conversion;
 
 import java.util.Properties;
 
-import javax.annotation.Nullable;
-
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NonNull;
@@ -40,8 +38,6 @@ class ExternalTable {
   protected final @NonNull String formatName;
   /** The path to the root of the table or the metadata directory depending on 
the format */
   protected final @NonNull String basePath;
-  /** The path to the data files, defaults to the metadataPath */
-  protected final @NonNull String dataPath;
   /** Optional namespace for the table */
   protected final String[] namespace;
   /** The configuration for interacting with the catalog that manages this 
table */
@@ -54,14 +50,12 @@ class ExternalTable {
       @NonNull String name,
       @NonNull String formatName,
       @NonNull String basePath,
-      @Nullable String dataPath,
       String[] namespace,
       CatalogConfig catalogConfig,
       Properties additionalProperties) {
     this.name = name;
     this.formatName = formatName;
     this.basePath = sanitizeBasePath(basePath);
-    this.dataPath = dataPath == null ? this.getBasePath() : 
sanitizeBasePath(dataPath);
     this.namespace = namespace;
     this.catalogConfig = catalogConfig;
     this.additionalProperties = additionalProperties;
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index 0a445aa9..f3e1c359 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -23,10 +23,13 @@ import java.util.Properties;
 import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.NonNull;
 
 @EqualsAndHashCode(callSuper = true)
 @Getter
 public class SourceTable extends ExternalTable {
+  /** The path to the data files, defaults to the basePath */
+  @NonNull private final String dataPath;
 
   @Builder(toBuilder = true)
   public SourceTable(
@@ -37,6 +40,7 @@ public class SourceTable extends ExternalTable {
       String[] namespace,
       CatalogConfig catalogConfig,
       Properties additionalProperties) {
-    super(name, formatName, basePath, dataPath, namespace, catalogConfig, 
additionalProperties);
+    super(name, formatName, basePath, namespace, catalogConfig, 
additionalProperties);
+    this.dataPath = dataPath == null ? this.getBasePath() : 
sanitizeBasePath(dataPath);
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 1f9a2a50..b180dc81 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -40,13 +40,12 @@ public class TargetTable extends ExternalTable {
       String name,
       String formatName,
       String basePath,
-      String dataPath,
       String[] namespace,
       CatalogConfig catalogConfig,
       Duration metadataRetention,
       Properties additionalProperties,
       List<TargetCatalog> targetCatalogs) {
-    super(name, formatName, basePath, dataPath, namespace, catalogConfig, 
additionalProperties);
+    super(name, formatName, basePath, namespace, catalogConfig, 
additionalProperties);
     this.metadataRetention =
         metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : 
metadataRetention;
     this.targetCatalogs = targetCatalogs == null ? Collections.emptyList() : 
targetCatalogs;
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
index 485a6e20..5422b0a7 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
@@ -27,15 +27,15 @@ public class TestExternalTable {
   @Test
   void sanitizePath() {
     ExternalTable tooManySlashes =
-        new ExternalTable("name", "hudi", "s3://bucket//path", null, null, 
null, null);
+        new ExternalTable("name", "hudi", "s3://bucket//path", null, null, 
null);
     assertEquals("s3://bucket/path", tooManySlashes.getBasePath());
 
     ExternalTable localFilePath =
-        new ExternalTable("name", "hudi", "/local/data//path", null, null, 
null, null);
+        new ExternalTable("name", "hudi", "/local/data//path", null, null, 
null);
     assertEquals("file:///local/data/path", localFilePath.getBasePath());
 
     ExternalTable properLocalFilePath =
-        new ExternalTable("name", "hudi", "file:///local/data//path", null, 
null, null, null);
+        new ExternalTable("name", "hudi", "file:///local/data//path", null, 
null, null);
     assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
   }
 
@@ -43,14 +43,14 @@ public class TestExternalTable {
   void errorIfRequiredArgsNotSet() {
     assertThrows(
         NullPointerException.class,
-        () -> new ExternalTable("name", "hudi", null, null, null, null, null));
+        () -> new ExternalTable("name", "hudi", null, null, null, null));
 
     assertThrows(
         NullPointerException.class,
-        () -> new ExternalTable("name", null, "file://bucket/path", null, 
null, null, null));
+        () -> new ExternalTable("name", null, "file://bucket/path", null, 
null, null));
 
     assertThrows(
         NullPointerException.class,
-        () -> new ExternalTable(null, "hudi", "file://bucket/path", null, 
null, null, null));
+        () -> new ExternalTable(null, "hudi", "file://bucket/path", null, 
null, null));
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
index 5c220ccd..f033cf2f 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
@@ -39,7 +39,6 @@ public class CatalogConversionFactory {
    *
    * @param sourceCatalog definition for the source catalog
    * @param configuration hadoop configuration
-   * @return
    */
   public static CatalogConversionSource createCatalogConversionSource(
       SourceCatalog sourceCatalog, Configuration configuration) {
@@ -53,7 +52,6 @@ public class CatalogConversionFactory {
    *
    * @param targetCatalog definition for the target catalog
    * @param configuration hadoop configuration
-   * @return
    */
   public CatalogSyncClient createCatalogSyncClient(
       TargetCatalog targetCatalog, Configuration configuration) {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
 
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
index 0a0d1e77..41113730 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
@@ -39,6 +39,11 @@ public class ExternalCatalogConfig implements CatalogConfig {
   public static ExternalCatalogConfig fromCatalogType(
       String catalogType, String catalogId, Map<String, String> properties) {
     // TODO: Choose existing implementation based on catalogType.
-    return ExternalCatalogConfig.builder().build();
+    String catalogImpl = "";
+    return ExternalCatalogConfig.builder()
+        .catalogImpl(catalogImpl)
+        .catalogName(catalogId)
+        .catalogOptions(properties)
+        .build();
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index b7399ba1..1d4495e0 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -18,6 +18,8 @@
  
 package org.apache.xtable.conversion;
 
+import static 
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Collection;
@@ -36,7 +38,6 @@ import lombok.SneakyThrows;
 import lombok.Value;
 import lombok.extern.log4j.Log4j2;
 
-import org.apache.commons.beanutils.BeanUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.xtable.catalog.CatalogConversionFactory;
@@ -196,13 +197,13 @@ public class ConversionController {
       TargetTable targetTable,
       List<CatalogSyncClient> catalogSyncClients,
       ConversionSourceProvider conversionSourceProvider) {
-    SourceTable sourceTable = SourceTable.builder().build();
-    BeanUtils.copyProperties(sourceTable, targetTable);
     return catalogSync.syncTable(
         catalogSyncClients,
-        // We get the latest state of InternalTable for TargetTable and sync 
it to
-        // catalogSyncClients.
-        
conversionSourceProvider.getConversionSourceInstance(sourceTable).getCurrentTable());
+        // We get the latest state of InternalTable for TargetTable
+        // and then synchronize it to catalogSyncClients.
+        conversionSourceProvider
+            .getConversionSourceInstance(convertToSourceTable(targetTable))
+            .getCurrentTable());
   }
 
   private static String getFormatsWithStatusCode(
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
 b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
similarity index 67%
copy from 
xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
copy to 
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
index d5b3c6df..f21be670 100644
--- 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -16,8 +16,18 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.utilities;
+package org.apache.xtable.conversion;
 
-import static org.junit.jupiter.api.Assertions.*;
+public class ConversionUtils {
 
-class TestRunCatalogSync {}
+  public static SourceTable convertToSourceTable(TargetTable table) {
+    return new SourceTable(
+        table.getName(),
+        table.getFormatName(),
+        table.getBasePath(),
+        table.getBasePath(),
+        table.getNamespace(),
+        table.getCatalogConfig(),
+        table.getAdditionalProperties());
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index df2494b8..51a31a5e 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,8 +18,12 @@
  
 package org.apache.xtable.conversion;
 
+import static 
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
+import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
 import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -44,18 +48,21 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatcher;
 
 import org.apache.xtable.catalog.CatalogConversionFactory;
+import org.apache.xtable.catalog.ExternalCatalogConfig;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.IncrementalTableChanges;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
 import org.apache.xtable.spi.extractor.ConversionSource;
 import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -64,6 +71,11 @@ public class TestConversionController {
   private final Configuration mockConf = mock(Configuration.class);
   private final ConversionSourceProvider<Instant> mockConversionSourceProvider 
=
       mock(ConversionSourceProvider.class);
+  private final ConversionSourceProvider<Instant> 
mockConversionSourceProvider2 =
+      mock(ConversionSourceProvider.class);
+  private final ConversionSourceProvider<Instant> 
mockConversionSourceProvider3 =
+      mock(ConversionSourceProvider.class);
+
   private final ConversionSource<Instant> mockConversionSource = 
mock(ConversionSource.class);
   private final ConversionTargetFactory mockConversionTargetFactory =
       mock(ConversionTargetFactory.class);
@@ -73,6 +85,8 @@ public class TestConversionController {
   private final CatalogSync catalogSync = mock(CatalogSync.class);
   private final ConversionTarget mockConversionTarget1 = 
mock(ConversionTarget.class);
   private final ConversionTarget mockConversionTarget2 = 
mock(ConversionTarget.class);
+  private final CatalogSyncClient mockCatalogSyncClient1 = 
mock(CatalogSyncClient.class);
+  private final CatalogSyncClient mockCatalogSyncClient2 = 
mock(CatalogSyncClient.class);
 
   @Test
   void testAllSnapshotSyncAsPerConfig() {
@@ -404,10 +418,84 @@ public class TestConversionController {
     assertEquals(expectedSyncResult, result);
   }
 
+  @Test
+  void testNoTableFormatConversionWithMultipleCatalogSync() {
+    SyncMode syncMode = SyncMode.INCREMENTAL;
+    List<TargetCatalog> targetCatalogs =
+        Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
+    InternalTable internalTable = getInternalTable();
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    // Conversion source and target mocks.
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(
+            Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode, 
targetCatalogs);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
+        .thenReturn(mockConversionSource);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            convertToSourceTable(conversionConfig.getTargetTables().get(0))))
+        .thenReturn(mockConversionSource);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            convertToSourceTable(conversionConfig.getTargetTables().get(1))))
+        .thenReturn(mockConversionSource);
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
+        .thenReturn(mockConversionTarget1);
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
+        .thenReturn(mockConversionTarget2);
+    
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
+    
when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable());
+    // Mocks for tableFormatSync.
+    Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
+    SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour, 
Duration.ofSeconds(1));
+    Map<String, SyncResult> tableFormatSyncResults =
+        buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult);
+    when(tableFormatSync.syncSnapshot(
+            argThat(containsAll(Arrays.asList(mockConversionTarget1, 
mockConversionTarget2))),
+            eq(internalSnapshot)))
+        .thenReturn(tableFormatSyncResults);
+    // Mocks for catalogSync.
+    
when(mockCatalogConversionFactory.createCatalogSyncClient(targetCatalogs.get(0),
 mockConf))
+        .thenReturn(mockCatalogSyncClient1);
+    
when(mockCatalogConversionFactory.createCatalogSyncClient(targetCatalogs.get(1),
 mockConf))
+        .thenReturn(mockCatalogSyncClient2);
+    when(catalogSync.syncTable(
+            eq(Arrays.asList(mockCatalogSyncClient1, mockCatalogSyncClient2)), 
any()))
+        .thenReturn(buildSyncResult(syncMode, Instant.now(), 
Duration.ofSeconds(3)));
+    ConversionController conversionController =
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
+    // Mocks for conversionSourceProviders.
+    Map<String, ConversionSourceProvider> conversionSourceProviders = new 
HashMap<>();
+    conversionSourceProviders.put(HUDI, mockConversionSourceProvider);
+    conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider);
+    conversionSourceProviders.put(DELTA, mockConversionSourceProvider);
+    // Assert results.
+    Map<String, SyncResult> mergedSyncResults =
+        buildPerTableResult(
+            Arrays.asList(ICEBERG, DELTA),
+            
syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build());
+    Map<String, SyncResult> result =
+        conversionController.syncCatalogs(conversionConfig, 
conversionSourceProviders);
+    assertEquals(mergedSyncResults, result);
+  }
+
   private SyncResult getLastSyncResult(List<SyncResult> syncResults) {
     return syncResults.get(syncResults.size() - 1);
   }
 
+  private Map<String, SyncResult> buildPerTableResult(
+      List<String> tableFormats, SyncResult syncResult) {
+    Map<String, SyncResult> perTableResults = new HashMap<>();
+    tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat, 
syncResult));
+    return perTableResults;
+  }
+
   private List<SyncResult> buildSyncResults(List<Instant> instantList) {
     return instantList.stream()
         .map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant))
@@ -426,6 +514,17 @@ public class TestConversionController {
         .build();
   }
 
+  private SyncResult buildSyncResult(
+      SyncMode syncMode, Instant lastSyncedInstant, Duration duration) {
+    return SyncResult.builder()
+        .mode(syncMode)
+        .lastInstantSynced(lastSyncedInstant)
+        .syncStartTime(Instant.now().minusSeconds(duration.getSeconds()))
+        .syncDuration(duration)
+        .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
+        .build();
+  }
+
   private InternalSnapshot buildSnapshot(InternalTable internalTable, String 
version) {
     return 
InternalSnapshot.builder().table(internalTable).version(version).build();
   }
@@ -443,6 +542,11 @@ public class TestConversionController {
   }
 
   private ConversionConfig getTableSyncConfig(List<String> targetTableFormats, 
SyncMode syncMode) {
+    return getTableSyncConfig(targetTableFormats, syncMode, 
Collections.emptyList());
+  }
+
+  private ConversionConfig getTableSyncConfig(
+      List<String> targetTableFormats, SyncMode syncMode, List<TargetCatalog> 
targetCatalogs) {
     SourceTable sourceTable =
         SourceTable.builder()
             .name("tablename")
@@ -458,6 +562,7 @@ public class TestConversionController {
                         .name("tablename")
                         .formatName(formatName)
                         .basePath("/tmp/doesnt/matter")
+                        .targetCatalogs(targetCatalogs)
                         .build())
             .collect(Collectors.toList());
 
@@ -468,6 +573,23 @@ public class TestConversionController {
         .build();
   }
 
+  private TargetCatalog getTargetCatalog(String suffix) {
+    return TargetCatalog.builder()
+        .catalogId("catalogId-" + suffix)
+        .catalogConfig(
+            ExternalCatalogConfig.builder()
+                .catalogName("catalogName-" + suffix)
+                .catalogImpl("catalogImpl-" + suffix)
+                .catalogOptions(Collections.emptyMap())
+                .build())
+        .catalogTableIdentifier(
+            CatalogTableIdentifier.builder()
+                .databaseName("target-database" + suffix)
+                .tableName("target-tableName" + suffix)
+                .build())
+        .build();
+  }
+
   private static <T> ArgumentMatcher<Collection<T>> containsAll(Collection<T> 
expected) {
     return actual -> actual.size() == expected.size() && 
actual.containsAll(expected);
   }
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 438faae4..d054693a 100644
--- 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++ 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import lombok.Data;
+import lombok.extern.log4j.Log4j2;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -69,11 +70,12 @@ import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdent
  * state in target tables, supports table format conversion as well if the 
target table chooses a
  * different format from source table.
  */
+@Log4j2
 public class RunCatalogSync {
   public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new 
YAMLFactory());
-  private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = "c";
-  private static final String HADOOP_CONFIG_PATH = "p";
-  private static final String CONVERTERS_CONFIG_PATH = "c";
+  private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = 
"catalogConfig";
+  private static final String HADOOP_CONFIG_PATH = "hadoopConfig";
+  private static final String CONVERTERS_CONFIG_PATH = "convertersConfig";
   private static final String HELP_OPTION = "h";
   private static final Map<String, ConversionSourceProvider> 
CONVERSION_SOURCE_PROVIDERS =
       new HashMap<>();
@@ -85,6 +87,18 @@ public class RunCatalogSync {
               "catalogSyncConfig",
               true,
               "The path to a yaml file containing source and target tables 
catalog configurations along with the table identifiers that need to synced")
+          .addOption(
+              HADOOP_CONFIG_PATH,
+              "hadoopConfig",
+              true,
+              "Hadoop config xml file path containing configs necessary to 
access the "
+                  + "file system. These configs will override the default 
configs.")
+          .addOption(
+              CONVERTERS_CONFIG_PATH,
+              "convertersConfig",
+              true,
+              "The path to a yaml file containing InternalTable converter 
configurations. "
+                  + "These configs will override the default")
           .addOption(HELP_OPTION, "help", false, "Displays help information to 
run this utility");
 
   public static void main(String[] args) throws Exception {
@@ -183,9 +197,13 @@ public class RunCatalogSync {
       tableFormats.addAll(
           
targetTables.stream().map(TargetTable::getFormatName).collect(Collectors.toList()));
       tableFormats = 
tableFormats.stream().distinct().collect(Collectors.toList());
-      conversionController.syncCatalogs(
-          conversionConfig,
-          getConversionSourceProviders(tableFormats, tableFormatConverters, 
hadoopConf));
+      try {
+        conversionController.syncCatalogs(
+            conversionConfig,
+            getConversionSourceProviders(tableFormats, tableFormatConverters, 
hadoopConf));
+      } catch (Exception e) {
+        log.error(String.format("Error running sync for %s", 
sourceTable.getBasePath()), e);
+      }
     }
   }
 
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
index d5b3c6df..7f6007ae 100644
--- 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
@@ -20,4 +20,101 @@ package org.apache.xtable.utilities;
 
 import static org.junit.jupiter.api.Assertions.*;
 
-class TestRunCatalogSync {}
+import java.util.Collections;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.catalog.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceCatalog;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+class TestRunCatalogSync {
+
+  @SneakyThrows
+  @Test
+  void testMain() {
+    String catalogConfigYamlPath =
+        
TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath();
+    String[] args = {"-catalogConfig", catalogConfigYamlPath};
+    RunCatalogSync.main(args);
+  }
+
+  public static class TestCatalogImpl implements CatalogConversionSource, 
CatalogSyncClient {
+
+    public TestCatalogImpl(SourceCatalog sourceCatalog, Configuration 
hadoopConf) {}
+
+    public TestCatalogImpl(TargetCatalog targetCatalog, Configuration 
hadoopConf) {}
+
+    @Override
+    public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+      return SourceTable.builder()
+          .name("source_table_name")
+          .basePath("file://base_path/v1/")
+          .formatName("ICEBERG")
+          .catalogConfig(
+              ExternalCatalogConfig.builder()
+                  .catalogImpl("catalog_impl")
+                  .catalogName("source-1")
+                  .catalogOptions(Collections.emptyMap())
+                  .build())
+          .build();
+    }
+
+    @Override
+    public String getCatalogId() {
+      return null;
+    }
+
+    @Override
+    public String getCatalogImpl() {
+      return null;
+    }
+
+    @Override
+    public CatalogTableIdentifier getTableIdentifier() {
+      return null;
+    }
+
+    @Override
+    public String getStorageDescriptorLocation(Object o) {
+      return null;
+    }
+
+    @Override
+    public boolean hasDatabase(String databaseName) {
+      return false;
+    }
+
+    @Override
+    public void createDatabase(String databaseName) {}
+
+    @Override
+    public Object getTable(CatalogTableIdentifier tableIdentifier) {
+      return null;
+    }
+
+    @Override
+    public void createTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void refreshTable(
+        InternalTable table, Object catalogTable, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void createOrReplaceTable(InternalTable table, 
CatalogTableIdentifier tableIdentifier) {}
+
+    @Override
+    public void dropTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void close() throws Exception {}
+  }
+}
diff --git a/xtable-utilities/src/main/resources/catalogConfig.yaml 
b/xtable-utilities/src/test/resources/catalogConfig.yaml
similarity index 79%
rename from xtable-utilities/src/main/resources/catalogConfig.yaml
rename to xtable-utilities/src/test/resources/catalogConfig.yaml
index a9fd6e21..b59af884 100644
--- a/xtable-utilities/src/main/resources/catalogConfig.yaml
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -16,26 +16,26 @@
 #
 sourceCatalog:
   catalogId: "source-1"
-  catalogType: "type1"
+  catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
   catalogProperties:
     key01: "value1"
     key02: "value2"
     key03: "value3"
 targetCatalogs:
   - catalogId: "target-1"
-    catalogType: "type2"
+    catalogImpl: 
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
     catalogProperties:
       key11: "value1"
       key12: "value2"
       key13: "value3"
   - catalogId: "target-2"
-    catalogType: "type3"
+    catalogImpl: 
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
     catalogProperties:
       key21: "value1"
       key22: "value2"
       key23: "value3"
   - catalogId: "target-3"
-    catalogType: "type3"
+    catalogImpl: 
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
     catalogProperties:
       key31: "value1"
       key32: "value2"
@@ -45,29 +45,30 @@ datasets:
       catalogTableIdentifier:
         databaseName: "source-database-1"
         tableName: "source-1"
-    targetCatalogIdentifiers:
-      - catalogIdentifier: "target-1"
+    targetCatalogTableIdentifiers:
+      - catalogId: "target-1"
         tableFormat: "DELTA"
         catalogTableIdentifier:
           databaseName: "target-database-1"
           tableName: "target-tableName-1"
-      - catalogIdentifier: "target-2"
+      - catalogId: "target-2"
         tableFormat: "HUDI"
         catalogTableIdentifier:
           databaseName: "target-database-2"
           tableName: "target-tableName-2-delta"
-  - sourceCatalogIdentifier:
+  - sourceCatalogTableIdentifier:
       storageIdentifier:
         tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
         tableName: catalog_sales
         partitionSpec: cs_sold_date_sk:VALUE
-    targetCatalogIdentifiers:
-      - catalogIdentifier: "target-2"
+        tableFormat: "HUDI"
+    targetCatalogTableIdentifiers:
+      - catalogId: "target-2"
         tableFormat: "ICEBERG"
         catalogTableIdentifier:
           databaseName: "target-database-2"
           tableName: "target-tableName-2"
-      - catalogIdentifier: "target-3"
+      - catalogId: "target-3"
         tableFormat: "HUDI"
         catalogTableIdentifier:
           databaseName: "target-database-3"

Reply via email to