This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch 590-CatalogSync
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/590-CatalogSync by this push:
new fce3524d Add tests - Part2
fce3524d is described below
commit fce3524d5fed70b5c50fd0b2437d4a8e0538c974
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Dec 12 17:56:34 2024 -0800
Add tests - Part2
---
.../apache/xtable/conversion/ExternalTable.java | 6 -
.../org/apache/xtable/conversion/SourceTable.java | 6 +-
.../org/apache/xtable/conversion/TargetTable.java | 3 +-
.../xtable/conversion/TestExternalTable.java | 12 +-
.../xtable/catalog/CatalogConversionFactory.java | 2 -
.../xtable/catalog/ExternalCatalogConfig.java | 7 +-
.../xtable/conversion/ConversionController.java | 13 ++-
.../apache/xtable/conversion/ConversionUtils.java | 16 ++-
.../conversion/TestConversionController.java | 122 +++++++++++++++++++++
.../apache/xtable/utilities/RunCatalogSync.java | 30 ++++-
.../xtable/utilities/TestRunCatalogSync.java | 99 ++++++++++++++++-
.../{main => test}/resources/catalogConfig.yaml | 23 ++--
12 files changed, 294 insertions(+), 45 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index 0dda90f3..939c59c0 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -20,8 +20,6 @@ package org.apache.xtable.conversion;
import java.util.Properties;
-import javax.annotation.Nullable;
-
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
@@ -40,8 +38,6 @@ class ExternalTable {
protected final @NonNull String formatName;
/** The path to the root of the table or the metadata directory depending on
the format */
protected final @NonNull String basePath;
- /** The path to the data files, defaults to the metadataPath */
- protected final @NonNull String dataPath;
/** Optional namespace for the table */
protected final String[] namespace;
/** The configuration for interacting with the catalog that manages this
table */
@@ -54,14 +50,12 @@ class ExternalTable {
@NonNull String name,
@NonNull String formatName,
@NonNull String basePath,
- @Nullable String dataPath,
String[] namespace,
CatalogConfig catalogConfig,
Properties additionalProperties) {
this.name = name;
this.formatName = formatName;
this.basePath = sanitizeBasePath(basePath);
- this.dataPath = dataPath == null ? this.getBasePath() :
sanitizeBasePath(dataPath);
this.namespace = namespace;
this.catalogConfig = catalogConfig;
this.additionalProperties = additionalProperties;
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index 0a445aa9..f3e1c359 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -23,10 +23,13 @@ import java.util.Properties;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.NonNull;
@EqualsAndHashCode(callSuper = true)
@Getter
public class SourceTable extends ExternalTable {
+ /** The path to the data files, defaults to the basePath */
+ @NonNull private final String dataPath;
@Builder(toBuilder = true)
public SourceTable(
@@ -37,6 +40,7 @@ public class SourceTable extends ExternalTable {
String[] namespace,
CatalogConfig catalogConfig,
Properties additionalProperties) {
- super(name, formatName, basePath, dataPath, namespace, catalogConfig,
additionalProperties);
+ super(name, formatName, basePath, namespace, catalogConfig,
additionalProperties);
+ this.dataPath = dataPath == null ? this.getBasePath() :
sanitizeBasePath(dataPath);
}
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 1f9a2a50..b180dc81 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -40,13 +40,12 @@ public class TargetTable extends ExternalTable {
String name,
String formatName,
String basePath,
- String dataPath,
String[] namespace,
CatalogConfig catalogConfig,
Duration metadataRetention,
Properties additionalProperties,
List<TargetCatalog> targetCatalogs) {
- super(name, formatName, basePath, dataPath, namespace, catalogConfig,
additionalProperties);
+ super(name, formatName, basePath, namespace, catalogConfig,
additionalProperties);
this.metadataRetention =
metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) :
metadataRetention;
this.targetCatalogs = targetCatalogs == null ? Collections.emptyList() :
targetCatalogs;
diff --git
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
index 485a6e20..5422b0a7 100644
---
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
+++
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
@@ -27,15 +27,15 @@ public class TestExternalTable {
@Test
void sanitizePath() {
ExternalTable tooManySlashes =
- new ExternalTable("name", "hudi", "s3://bucket//path", null, null,
null, null);
+ new ExternalTable("name", "hudi", "s3://bucket//path", null, null,
null);
assertEquals("s3://bucket/path", tooManySlashes.getBasePath());
ExternalTable localFilePath =
- new ExternalTable("name", "hudi", "/local/data//path", null, null,
null, null);
+ new ExternalTable("name", "hudi", "/local/data//path", null, null,
null);
assertEquals("file:///local/data/path", localFilePath.getBasePath());
ExternalTable properLocalFilePath =
- new ExternalTable("name", "hudi", "file:///local/data//path", null,
null, null, null);
+ new ExternalTable("name", "hudi", "file:///local/data//path", null,
null, null);
assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
}
@@ -43,14 +43,14 @@ public class TestExternalTable {
void errorIfRequiredArgsNotSet() {
assertThrows(
NullPointerException.class,
- () -> new ExternalTable("name", "hudi", null, null, null, null, null));
+ () -> new ExternalTable("name", "hudi", null, null, null, null));
assertThrows(
NullPointerException.class,
- () -> new ExternalTable("name", null, "file://bucket/path", null,
null, null, null));
+ () -> new ExternalTable("name", null, "file://bucket/path", null,
null, null));
assertThrows(
NullPointerException.class,
- () -> new ExternalTable(null, "hudi", "file://bucket/path", null,
null, null, null));
+ () -> new ExternalTable(null, "hudi", "file://bucket/path", null,
null, null));
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
index 5c220ccd..f033cf2f 100644
---
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
@@ -39,7 +39,6 @@ public class CatalogConversionFactory {
*
* @param sourceCatalog definition for the source catalog
* @param configuration hadoop configuration
- * @return
*/
public static CatalogConversionSource createCatalogConversionSource(
SourceCatalog sourceCatalog, Configuration configuration) {
@@ -53,7 +52,6 @@ public class CatalogConversionFactory {
*
* @param targetCatalog definition for the target catalog
* @param configuration hadoop configuration
- * @return
*/
public CatalogSyncClient createCatalogSyncClient(
TargetCatalog targetCatalog, Configuration configuration) {
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
index 0a0d1e77..41113730 100644
---
a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java
@@ -39,6 +39,11 @@ public class ExternalCatalogConfig implements CatalogConfig {
public static ExternalCatalogConfig fromCatalogType(
String catalogType, String catalogId, Map<String, String> properties) {
// TODO: Choose existing implementation based on catalogType.
- return ExternalCatalogConfig.builder().build();
+ String catalogImpl = "";
+ return ExternalCatalogConfig.builder()
+ .catalogImpl(catalogImpl)
+ .catalogName(catalogId)
+ .catalogOptions(properties)
+ .build();
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index b7399ba1..1d4495e0 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -18,6 +18,8 @@
package org.apache.xtable.conversion;
+import static
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
@@ -36,7 +38,6 @@ import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.log4j.Log4j2;
-import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.xtable.catalog.CatalogConversionFactory;
@@ -196,13 +197,13 @@ public class ConversionController {
TargetTable targetTable,
List<CatalogSyncClient> catalogSyncClients,
ConversionSourceProvider conversionSourceProvider) {
- SourceTable sourceTable = SourceTable.builder().build();
- BeanUtils.copyProperties(sourceTable, targetTable);
return catalogSync.syncTable(
catalogSyncClients,
- // We get the latest state of InternalTable for TargetTable and sync
it to
- // catalogSyncClients.
-
conversionSourceProvider.getConversionSourceInstance(sourceTable).getCurrentTable());
+ // We get the latest state of InternalTable for TargetTable
+ // and then synchronize it to catalogSyncClients.
+ conversionSourceProvider
+ .getConversionSourceInstance(convertToSourceTable(targetTable))
+ .getCurrentTable());
}
private static String getFormatsWithStatusCode(
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
similarity index 67%
copy from
xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
copy to
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
index d5b3c6df..f21be670 100644
---
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -16,8 +16,18 @@
* limitations under the License.
*/
-package org.apache.xtable.utilities;
+package org.apache.xtable.conversion;
-import static org.junit.jupiter.api.Assertions.*;
+public class ConversionUtils {
-class TestRunCatalogSync {}
+ public static SourceTable convertToSourceTable(TargetTable table) {
+ return new SourceTable(
+ table.getName(),
+ table.getFormatName(),
+ table.getBasePath(),
+ table.getBasePath(),
+ table.getNamespace(),
+ table.getCatalogConfig(),
+ table.getAdditionalProperties());
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index df2494b8..51a31a5e 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,8 +18,12 @@
package org.apache.xtable.conversion;
+import static
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
+import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -44,18 +48,21 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.apache.xtable.catalog.CatalogConversionFactory;
+import org.apache.xtable.catalog.ExternalCatalogConfig;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.spi.extractor.ConversionSource;
import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
import org.apache.xtable.spi.sync.ConversionTarget;
import org.apache.xtable.spi.sync.TableFormatSync;
@@ -64,6 +71,11 @@ public class TestConversionController {
private final Configuration mockConf = mock(Configuration.class);
private final ConversionSourceProvider<Instant> mockConversionSourceProvider
=
mock(ConversionSourceProvider.class);
+ private final ConversionSourceProvider<Instant>
mockConversionSourceProvider2 =
+ mock(ConversionSourceProvider.class);
+ private final ConversionSourceProvider<Instant>
mockConversionSourceProvider3 =
+ mock(ConversionSourceProvider.class);
+
private final ConversionSource<Instant> mockConversionSource =
mock(ConversionSource.class);
private final ConversionTargetFactory mockConversionTargetFactory =
mock(ConversionTargetFactory.class);
@@ -73,6 +85,8 @@ public class TestConversionController {
private final CatalogSync catalogSync = mock(CatalogSync.class);
private final ConversionTarget mockConversionTarget1 =
mock(ConversionTarget.class);
private final ConversionTarget mockConversionTarget2 =
mock(ConversionTarget.class);
+ private final CatalogSyncClient mockCatalogSyncClient1 =
mock(CatalogSyncClient.class);
+ private final CatalogSyncClient mockCatalogSyncClient2 =
mock(CatalogSyncClient.class);
@Test
void testAllSnapshotSyncAsPerConfig() {
@@ -404,10 +418,84 @@ public class TestConversionController {
assertEquals(expectedSyncResult, result);
}
+ @Test
+ void testNoTableFormatConversionWithMultipleCatalogSync() {
+ SyncMode syncMode = SyncMode.INCREMENTAL;
+ List<TargetCatalog> targetCatalogs =
+ Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
+ InternalTable internalTable = getInternalTable();
+ InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+ // Conversion source and target mocks.
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode,
targetCatalogs);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
+ .thenReturn(mockConversionSource);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ convertToSourceTable(conversionConfig.getTargetTables().get(0))))
+ .thenReturn(mockConversionSource);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ convertToSourceTable(conversionConfig.getTargetTables().get(1))))
+ .thenReturn(mockConversionSource);
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
+ .thenReturn(mockConversionTarget1);
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
+ .thenReturn(mockConversionTarget2);
+
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
+
when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable());
+ // Mocks for tableFormatSync.
+ Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
+ SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour,
Duration.ofSeconds(1));
+ Map<String, SyncResult> tableFormatSyncResults =
+ buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult);
+ when(tableFormatSync.syncSnapshot(
+ argThat(containsAll(Arrays.asList(mockConversionTarget1,
mockConversionTarget2))),
+ eq(internalSnapshot)))
+ .thenReturn(tableFormatSyncResults);
+ // Mocks for catalogSync.
+
when(mockCatalogConversionFactory.createCatalogSyncClient(targetCatalogs.get(0),
mockConf))
+ .thenReturn(mockCatalogSyncClient1);
+
when(mockCatalogConversionFactory.createCatalogSyncClient(targetCatalogs.get(1),
mockConf))
+ .thenReturn(mockCatalogSyncClient2);
+ when(catalogSync.syncTable(
+ eq(Arrays.asList(mockCatalogSyncClient1, mockCatalogSyncClient2)),
any()))
+ .thenReturn(buildSyncResult(syncMode, Instant.now(),
Duration.ofSeconds(3)));
+ ConversionController conversionController =
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
+ // Mocks for conversionSourceProviders.
+ Map<String, ConversionSourceProvider> conversionSourceProviders = new
HashMap<>();
+ conversionSourceProviders.put(HUDI, mockConversionSourceProvider);
+ conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider);
+ conversionSourceProviders.put(DELTA, mockConversionSourceProvider);
+ // Assert results.
+ Map<String, SyncResult> mergedSyncResults =
+ buildPerTableResult(
+ Arrays.asList(ICEBERG, DELTA),
+
syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build());
+ Map<String, SyncResult> result =
+ conversionController.syncCatalogs(conversionConfig,
conversionSourceProviders);
+ assertEquals(mergedSyncResults, result);
+ }
+
private SyncResult getLastSyncResult(List<SyncResult> syncResults) {
return syncResults.get(syncResults.size() - 1);
}
+ private Map<String, SyncResult> buildPerTableResult(
+ List<String> tableFormats, SyncResult syncResult) {
+ Map<String, SyncResult> perTableResults = new HashMap<>();
+ tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat,
syncResult));
+ return perTableResults;
+ }
+
private List<SyncResult> buildSyncResults(List<Instant> instantList) {
return instantList.stream()
.map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant))
@@ -426,6 +514,17 @@ public class TestConversionController {
.build();
}
+ private SyncResult buildSyncResult(
+ SyncMode syncMode, Instant lastSyncedInstant, Duration duration) {
+ return SyncResult.builder()
+ .mode(syncMode)
+ .lastInstantSynced(lastSyncedInstant)
+ .syncStartTime(Instant.now().minusSeconds(duration.getSeconds()))
+ .syncDuration(duration)
+ .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
+ .build();
+ }
+
private InternalSnapshot buildSnapshot(InternalTable internalTable, String
version) {
return
InternalSnapshot.builder().table(internalTable).version(version).build();
}
@@ -443,6 +542,11 @@ public class TestConversionController {
}
private ConversionConfig getTableSyncConfig(List<String> targetTableFormats,
SyncMode syncMode) {
+ return getTableSyncConfig(targetTableFormats, syncMode,
Collections.emptyList());
+ }
+
+ private ConversionConfig getTableSyncConfig(
+ List<String> targetTableFormats, SyncMode syncMode, List<TargetCatalog>
targetCatalogs) {
SourceTable sourceTable =
SourceTable.builder()
.name("tablename")
@@ -458,6 +562,7 @@ public class TestConversionController {
.name("tablename")
.formatName(formatName)
.basePath("/tmp/doesnt/matter")
+ .targetCatalogs(targetCatalogs)
.build())
.collect(Collectors.toList());
@@ -468,6 +573,23 @@ public class TestConversionController {
.build();
}
+ private TargetCatalog getTargetCatalog(String suffix) {
+ return TargetCatalog.builder()
+ .catalogId("catalogId-" + suffix)
+ .catalogConfig(
+ ExternalCatalogConfig.builder()
+ .catalogName("catalogName-" + suffix)
+ .catalogImpl("catalogImpl-" + suffix)
+ .catalogOptions(Collections.emptyMap())
+ .build())
+ .catalogTableIdentifier(
+ CatalogTableIdentifier.builder()
+ .databaseName("target-database" + suffix)
+ .tableName("target-tableName" + suffix)
+ .build())
+ .build();
+ }
+
private static <T> ArgumentMatcher<Collection<T>> containsAll(Collection<T>
expected) {
return actual -> actual.size() == expected.size() &&
actual.containsAll(expected);
}
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 438faae4..d054693a 100644
---
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -34,6 +34,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Data;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -69,11 +70,12 @@ import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdent
* state in target tables, supports table format conversion as well if the
target table chooses a
* different format from source table.
*/
+@Log4j2
public class RunCatalogSync {
public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new
YAMLFactory());
- private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = "c";
- private static final String HADOOP_CONFIG_PATH = "p";
- private static final String CONVERTERS_CONFIG_PATH = "c";
+ private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH =
"catalogConfig";
+ private static final String HADOOP_CONFIG_PATH = "hadoopConfig";
+ private static final String CONVERTERS_CONFIG_PATH = "convertersConfig";
private static final String HELP_OPTION = "h";
private static final Map<String, ConversionSourceProvider>
CONVERSION_SOURCE_PROVIDERS =
new HashMap<>();
@@ -85,6 +87,18 @@ public class RunCatalogSync {
"catalogSyncConfig",
true,
"The path to a yaml file containing source and target tables
catalog configurations along with the table identifiers that need to synced")
+ .addOption(
+ HADOOP_CONFIG_PATH,
+ "hadoopConfig",
+ true,
+ "Hadoop config xml file path containing configs necessary to
access the "
+ + "file system. These configs will override the default
configs.")
+ .addOption(
+ CONVERTERS_CONFIG_PATH,
+ "convertersConfig",
+ true,
+ "The path to a yaml file containing InternalTable converter
configurations. "
+ + "These configs will override the default")
.addOption(HELP_OPTION, "help", false, "Displays help information to
run this utility");
public static void main(String[] args) throws Exception {
@@ -183,9 +197,13 @@ public class RunCatalogSync {
tableFormats.addAll(
targetTables.stream().map(TargetTable::getFormatName).collect(Collectors.toList()));
tableFormats =
tableFormats.stream().distinct().collect(Collectors.toList());
- conversionController.syncCatalogs(
- conversionConfig,
- getConversionSourceProviders(tableFormats, tableFormatConverters,
hadoopConf));
+ try {
+ conversionController.syncCatalogs(
+ conversionConfig,
+ getConversionSourceProviders(tableFormats, tableFormatConverters,
hadoopConf));
+ } catch (Exception e) {
+ log.error(String.format("Error running sync for %s",
sourceTable.getBasePath()), e);
+ }
}
}
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
index d5b3c6df..7f6007ae 100644
---
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
+++
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
@@ -20,4 +20,101 @@ package org.apache.xtable.utilities;
import static org.junit.jupiter.api.Assertions.*;
-class TestRunCatalogSync {}
+import java.util.Collections;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.catalog.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceCatalog;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+class TestRunCatalogSync {
+
+ @SneakyThrows
+ @Test
+ void testMain() {
+ String catalogConfigYamlPath =
+
TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath();
+ String[] args = {"-catalogConfig", catalogConfigYamlPath};
+ RunCatalogSync.main(args);
+ }
+
+ public static class TestCatalogImpl implements CatalogConversionSource,
CatalogSyncClient {
+
+ public TestCatalogImpl(SourceCatalog sourceCatalog, Configuration
hadoopConf) {}
+
+ public TestCatalogImpl(TargetCatalog targetCatalog, Configuration
hadoopConf) {}
+
+ @Override
+ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+ return SourceTable.builder()
+ .name("source_table_name")
+ .basePath("file://base_path/v1/")
+ .formatName("ICEBERG")
+ .catalogConfig(
+ ExternalCatalogConfig.builder()
+ .catalogImpl("catalog_impl")
+ .catalogName("source-1")
+ .catalogOptions(Collections.emptyMap())
+ .build())
+ .build();
+ }
+
+ @Override
+ public String getCatalogId() {
+ return null;
+ }
+
+ @Override
+ public String getCatalogImpl() {
+ return null;
+ }
+
+ @Override
+ public CatalogTableIdentifier getTableIdentifier() {
+ return null;
+ }
+
+ @Override
+ public String getStorageDescriptorLocation(Object o) {
+ return null;
+ }
+
+ @Override
+ public boolean hasDatabase(String databaseName) {
+ return false;
+ }
+
+ @Override
+ public void createDatabase(String databaseName) {}
+
+ @Override
+ public Object getTable(CatalogTableIdentifier tableIdentifier) {
+ return null;
+ }
+
+ @Override
+ public void createTable(InternalTable table, CatalogTableIdentifier
tableIdentifier) {}
+
+ @Override
+ public void refreshTable(
+ InternalTable table, Object catalogTable, CatalogTableIdentifier
tableIdentifier) {}
+
+ @Override
+ public void createOrReplaceTable(InternalTable table,
CatalogTableIdentifier tableIdentifier) {}
+
+ @Override
+ public void dropTable(InternalTable table, CatalogTableIdentifier
tableIdentifier) {}
+
+ @Override
+ public void close() throws Exception {}
+ }
+}
diff --git a/xtable-utilities/src/main/resources/catalogConfig.yaml
b/xtable-utilities/src/test/resources/catalogConfig.yaml
similarity index 79%
rename from xtable-utilities/src/main/resources/catalogConfig.yaml
rename to xtable-utilities/src/test/resources/catalogConfig.yaml
index a9fd6e21..b59af884 100644
--- a/xtable-utilities/src/main/resources/catalogConfig.yaml
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -16,26 +16,26 @@
#
sourceCatalog:
catalogId: "source-1"
- catalogType: "type1"
+ catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
catalogProperties:
key01: "value1"
key02: "value2"
key03: "value3"
targetCatalogs:
- catalogId: "target-1"
- catalogType: "type2"
+ catalogImpl:
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
catalogProperties:
key11: "value1"
key12: "value2"
key13: "value3"
- catalogId: "target-2"
- catalogType: "type3"
+ catalogImpl:
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
catalogProperties:
key21: "value1"
key22: "value2"
key23: "value3"
- catalogId: "target-3"
- catalogType: "type3"
+ catalogImpl:
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
catalogProperties:
key31: "value1"
key32: "value2"
@@ -45,29 +45,30 @@ datasets:
catalogTableIdentifier:
databaseName: "source-database-1"
tableName: "source-1"
- targetCatalogIdentifiers:
- - catalogIdentifier: "target-1"
+ targetCatalogTableIdentifiers:
+ - catalogId: "target-1"
tableFormat: "DELTA"
catalogTableIdentifier:
databaseName: "target-database-1"
tableName: "target-tableName-1"
- - catalogIdentifier: "target-2"
+ - catalogId: "target-2"
tableFormat: "HUDI"
catalogTableIdentifier:
databaseName: "target-database-2"
tableName: "target-tableName-2-delta"
- - sourceCatalogIdentifier:
+ - sourceCatalogTableIdentifier:
storageIdentifier:
tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
tableName: catalog_sales
partitionSpec: cs_sold_date_sk:VALUE
- targetCatalogIdentifiers:
- - catalogIdentifier: "target-2"
+ tableFormat: "HUDI"
+ targetCatalogTableIdentifiers:
+ - catalogId: "target-2"
tableFormat: "ICEBERG"
catalogTableIdentifier:
databaseName: "target-database-2"
tableName: "target-tableName-2"
- - catalogIdentifier: "target-3"
+ - catalogId: "target-3"
tableFormat: "HUDI"
catalogTableIdentifier:
databaseName: "target-database-3"