This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch 590-CatalogSync in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit 6c269e48f43dd54fd8731112948638879b76eea6 Author: Vinish Reddy <[email protected]> AuthorDate: Mon Jan 27 11:15:03 2025 -0800 [590] Add RunCatalogSync utility for synchronizing tables across catalogs --- .../apache/xtable/conversion/ConversionConfig.java | 12 +- .../xtable/conversion/ExternalCatalogConfig.java | 65 ++++ .../xtable/conversion/TargetCatalogConfig.java | 21 +- .../apache/xtable/model/storage/CatalogType.java | 24 +- .../spi/extractor/CatalogConversionSource.java | 3 + .../apache/xtable/spi/sync/CatalogSyncClient.java | 3 + xtable-core/pom.xml | 20 ++ .../xtable/catalog/CatalogConversionFactory.java | 96 ++++++ .../xtable/conversion/ConversionController.java | 212 +++++++++--- .../ConversionUtils.java} | 27 +- .../xtable/iceberg/IcebergCatalogConfig.java | 8 +- .../catalog/TestCatalogConversionFactory.java | 108 ++++++ .../conversion/TestConversionController.java | 174 +++++++++- .../org/apache/xtable/testutil/ITTestUtils.java | 108 ++++++ ...he.xtable.spi.extractor.CatalogConversionSource | 18 + .../org.apache.xtable.spi.sync.CatalogSyncClient | 18 + xtable-utilities/pom.xml | 23 ++ .../apache/xtable/utilities/RunCatalogSync.java | 365 +++++++++++++++++++++ .../java/org/apache/xtable/utilities/RunSync.java | 4 +- .../apache/xtable/utilities/ITRunCatalogSync.java | 142 ++++++++ .../xtable/utilities/TestRunCatalogSync.java | 28 +- .../src/test/resources/catalogConfig.yaml | 71 ++++ 22 files changed, 1441 insertions(+), 109 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 73e2628d..63e9d673 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -18,7 +18,9 @@ package org.apache.xtable.conversion; +import java.util.Collections; import java.util.List; +import java.util.Map; import lombok.Builder; import lombok.NonNull; @@ -29,22 +31,30 @@ import com.google.common.base.Preconditions; import org.apache.xtable.model.sync.SyncMode; @Value +@Builder public class ConversionConfig { // The source of the sync @NonNull SourceTable sourceTable; // One or more targets to sync the table metadata to List<TargetTable> targetTables; + // Each target table can be synced to multiple target catalogs, this is map from + // targetTable to target catalogs. + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs; // The mode, incremental or snapshot SyncMode syncMode; @Builder ConversionConfig( - @NonNull SourceTable sourceTable, List<TargetTable> targetTables, SyncMode syncMode) { + @NonNull SourceTable sourceTable, + List<TargetTable> targetTables, + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs, + SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); + this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : targetCatalogs; this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java new file mode 100644 index 00000000..b525d831 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Collections; +import java.util.Map; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +/** + * Defines the configuration for an external catalog, user needs to populate at-least one of {@link + * ExternalCatalogConfig#catalogType} or {@link ExternalCatalogConfig#catalogSyncClientImpl} + */ +@Value +@Builder +public class ExternalCatalogConfig { + /** + * A user-defined unique identifier for the catalog, allows user to sync table to multiple + * catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2 + */ + @NonNull String catalogId; + + /** + * The type of the catalog. If the catalogType implementation exists in XTable, the implementation + * class will be inferred. + */ + String catalogType; + + /** + * (Optional) A fully qualified class name that implements the interface for {@link + * org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the implementation for + * catalogType doesn't exist in XTable. + */ + String catalogSyncClientImpl; + + /** + * (Optional) A fully qualified class name that implements the interface for {@link + * org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used if the implementation + * for catalogType doesn't exist in XTable. + */ + String catalogConversionSourceImpl; + + /** + * The properties for this catalog, used for providing any custom behaviour during catalog sync + */ + @NonNull @Builder.Default Map<String, String> catalogProperties = Collections.emptyMap(); +} diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java similarity index 64% copy from xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java copy to xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java index d5d7a3c5..d6687523 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java @@ -16,21 +16,24 @@ * limitations under the License. */ -package org.apache.xtable.iceberg; - -import java.util.Collections; -import java.util.Map; +package org.apache.xtable.conversion; import lombok.Builder; import lombok.NonNull; import lombok.Value; -import org.apache.xtable.conversion.CatalogConfig; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +/** + * TargetCatalogConfig contains the parameters that are required when syncing {@link TargetTable} to + * a catalog. + */ @Value @Builder -public class IcebergCatalogConfig implements CatalogConfig { - @NonNull String catalogImpl; - @NonNull String catalogName; - @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); +public class TargetCatalogConfig { + /** The tableIdentifier that will be used when syncing {@link TargetTable} to the catalog. */ + @NonNull CatalogTableIdentifier catalogTableIdentifier; + + /** Configuration for the catalog. */ + @NonNull ExternalCatalogConfig catalogConfig; } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java similarity index 65% copy from xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java copy to xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java index d5d7a3c5..e2b028dc 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java @@ -16,21 +16,13 @@ * limitations under the License. */ -package org.apache.xtable.iceberg; +package org.apache.xtable.model.storage; -import java.util.Collections; -import java.util.Map; - -import lombok.Builder; -import lombok.NonNull; -import lombok.Value; - -import org.apache.xtable.conversion.CatalogConfig; - -@Value -@Builder -public class IcebergCatalogConfig implements CatalogConfig { - @NonNull String catalogImpl; - @NonNull String catalogName; - @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); +/** + * Default constants for supported catalog types. + * + * @since 0.1 + */ +public class CatalogType { + public static final String STORAGE = "STORAGE"; } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java index 616f3d45..1525e6fa 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java @@ -29,4 +29,7 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier; public interface CatalogConversionSource { /** Returns the source table object present in the catalog. */ SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier); + + /** Returns the {@link org.apache.xtable.model.storage.CatalogType} for the catalog conversion */ + String getCatalogType(); } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java index 62de9379..cc322854 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java @@ -34,6 +34,9 @@ public interface CatalogSyncClient<TABLE> extends AutoCloseable { */ String getCatalogId(); + /** Returns the {@link org.apache.xtable.model.storage.CatalogType} the client syncs to */ + String getCatalogType(); + /** Returns the storage location of the table synced to the catalog. */ String getStorageLocation(TABLE table); diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e..80de2299 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -174,4 +174,24 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java new file mode 100644 index 00000000..add95c21 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import java.util.ServiceLoader; +import java.util.function.Function; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.reflection.ReflectionUtils; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogConversionFactory { + private static final CatalogConversionFactory INSTANCE = new CatalogConversionFactory(); + + public static CatalogConversionFactory getInstance() { + return INSTANCE; + } + + /** + * Returns an implementation class for {@link CatalogConversionSource} that's used for converting + * table definitions in the catalog to {@link org.apache.xtable.conversion.SourceTable} object. + * + * @param sourceCatalogConfig configuration for the source catalog + * @param configuration hadoop configuration + */ + public static CatalogConversionSource createCatalogConversionSource( + ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) { + if (!StringUtils.isEmpty(sourceCatalogConfig.getCatalogType())) { + return findInstance( + CatalogConversionSource.class, + sourceCatalogConfig.getCatalogType(), + CatalogConversionSource::getCatalogType); + } + return ReflectionUtils.createInstanceOfClass( + sourceCatalogConfig.getCatalogConversionSourceImpl(), sourceCatalogConfig, configuration); + } + + /** + * Returns an implementation class for {@link CatalogSyncClient} that's used for syncing {@link + * org.apache.xtable.conversion.TargetTable} to a catalog. + * + * @param targetCatalogConfig configuration for the target catalog + * @param configuration hadoop configuration + */ + public <TABLE> CatalogSyncClient<TABLE> createCatalogSyncClient( + ExternalCatalogConfig targetCatalogConfig, String tableFormat, Configuration configuration) { + if (!StringUtils.isEmpty(targetCatalogConfig.getCatalogType())) { + return findInstance( + CatalogSyncClient.class, + targetCatalogConfig.getCatalogType(), + CatalogSyncClient::getCatalogType); + } + return ReflectionUtils.createInstanceOfClass( + targetCatalogConfig.getCatalogSyncClientImpl(), + targetCatalogConfig, + tableFormat, + configuration); + } + + private static <T> T findInstance( + Class<T> serviceClass, String catalogType, Function<T, String> catalogTypeExtractor) { + ServiceLoader<T> loader = ServiceLoader.load(serviceClass); + for (T instance : loader) { + String instanceCatalogType = catalogTypeExtractor.apply(instance); + if (catalogType.equals(instanceCatalogType)) { + return instance; + } + } + throw new NotSupportedException("catalogType is not yet supported: " + catalogType); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index 222652a6..1db145ee 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -18,6 +18,8 @@ package org.apache.xtable.conversion; +import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable; + import java.io.IOException; import java.time.Instant; import java.util.Collection; @@ -37,15 +39,19 @@ import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; +import org.apache.xtable.catalog.CatalogConversionFactory; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.IncrementalTableChanges; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.ExtractFromSource; +import org.apache.xtable.spi.sync.CatalogSync; +import org.apache.xtable.spi.sync.CatalogSyncClient; import org.apache.xtable.spi.sync.ConversionTarget; import org.apache.xtable.spi.sync.TableFormatSync; @@ -64,10 +70,17 @@ import org.apache.xtable.spi.sync.TableFormatSync; public class ConversionController { private final Configuration conf; private final ConversionTargetFactory conversionTargetFactory; + private final CatalogConversionFactory catalogConversionFactory; private final TableFormatSync tableFormatSync; + private final CatalogSync catalogSync; public ConversionController(Configuration conf) { - this(conf, ConversionTargetFactory.getInstance(), TableFormatSync.getInstance()); + this( + conf, + ConversionTargetFactory.getInstance(), + CatalogConversionFactory.getInstance(), + TableFormatSync.getInstance(), + CatalogSync.getInstance()); } /** @@ -89,57 +102,146 @@ public class ConversionController { try (ConversionSource<COMMIT> conversionSource = conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource<COMMIT> source = ExtractFromSource.of(conversionSource); + return syncTableFormats(config, source, config.getSyncMode()); + } catch (IOException ioException) { + throw new ReadException("Failed to close source converter", ioException); + } + } - Map<String, ConversionTarget> conversionTargetByFormat = - config.getTargetTables().stream() - .collect( - Collectors.toMap( - TargetTable::getFormatName, - targetTable -> conversionTargetFactory.createForFormat(targetTable, conf))); - // State for each TableFormat - Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat = - conversionTargetByFormat.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, entry -> entry.getValue().getTableMetadata())); - Map<String, ConversionTarget> formatsToSyncIncrementally = - getFormatsToSyncIncrementally( - config, - conversionTargetByFormat, - lastSyncMetadataByFormat, - source.getConversionSource()); - Map<String, ConversionTarget> formatsToSyncBySnapshot = - conversionTargetByFormat.entrySet().stream() - .filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - SyncResultForTableFormats syncResultForSnapshotSync = - formatsToSyncBySnapshot.isEmpty() - ? SyncResultForTableFormats.builder().build() - : syncSnapshot(formatsToSyncBySnapshot, source); - SyncResultForTableFormats syncResultForIncrementalSync = - formatsToSyncIncrementally.isEmpty() - ? SyncResultForTableFormats.builder().build() - : syncIncrementalChanges( - formatsToSyncIncrementally, lastSyncMetadataByFormat, source); - Map<String, SyncResult> syncResultsMerged = - new HashMap<>(syncResultForIncrementalSync.getLastSyncResult()); - syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult()); - String successfulSyncs = - getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.SUCCESS); - if (!successfulSyncs.isEmpty()) { - log.info("Sync is successful for the following formats {}", successfulSyncs); - } - String failedSyncs = - getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.ERROR); - if (!failedSyncs.isEmpty()) { - log.error("Sync failed for the following formats {}", failedSyncs); + /** + * Synchronizes the source table in conversion config to multiple target catalogs. If the + * configuration for the target table uses a different table format, synchronizes the table format + * first before syncing it to target catalog + * + * @param config A per table level config containing source table, target tables, target catalogs + * and syncMode. + * @param conversionSourceProvider A provider for the {@link ConversionSource} instance for each + * tableFormat, {@link ConversionSourceProvider#init(Configuration)} must be called before + * calling this method. + * @return Returns a map containing the table format, and it's sync result. Run sync for a table * + * with the provided per table level configuration. + */ + public Map<String, SyncResult> syncTableAcrossCatalogs( + ConversionConfig config, Map<String, ConversionSourceProvider> conversionSourceProvider) { + if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { + throw new IllegalArgumentException("Please provide at-least one format to sync"); + } + try (ConversionSource conversionSource = + conversionSourceProvider + .get(config.getSourceTable().getFormatName()) + .getConversionSourceInstance(config.getSourceTable())) { + ExtractFromSource source = ExtractFromSource.of(conversionSource); + Map<String, SyncResult> tableFormatSyncResults = + syncTableFormats(config, source, config.getSyncMode()); + Map<String, SyncResult> catalogSyncResults = new HashMap<>(); + for (TargetTable targetTable : config.getTargetTables()) { + Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients = + config.getTargetCatalogs().get(targetTable).stream() + .collect( + Collectors.toMap( + TargetCatalogConfig::getCatalogTableIdentifier, + targetCatalog -> + catalogConversionFactory.createCatalogSyncClient( + targetCatalog.getCatalogConfig(), + targetTable.getFormatName(), + conf))); + catalogSyncResults.put( + targetTable.getFormatName(), + syncCatalogsForTargetTable( + targetTable, + catalogSyncClients, + conversionSourceProvider.get(targetTable.getFormatName()))); } - return syncResultsMerged; + mergeSyncResults(tableFormatSyncResults, catalogSyncResults); + return tableFormatSyncResults; } catch (IOException ioException) { throw new ReadException("Failed to close source converter", ioException); } } + /** + * Synchronizes the given source table format metadata in ConversionConfig to multiple target + * formats. + * + * @param config A per table level config containing tableBasePath, partitionFieldSpecConfig, + * targetTableFormats and syncMode. + * @param source An extractor class for {@link ConversionSource} and allows fetching current + * snapshot or incremental table changes. + * @param syncMode sync mode is either FULL or INCREMENTAL. + * @return Returns a map containing the table format, and it's sync result. + */ + private <COMMIT> Map<String, SyncResult> syncTableFormats( + ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode syncMode) { + Map<String, ConversionTarget> conversionTargetByFormat = + config.getTargetTables().stream() + .filter( + targetTable -> + !targetTable.getFormatName().equals(config.getSourceTable().getFormatName())) + .collect( + Collectors.toMap( + TargetTable::getFormatName, + targetTable -> conversionTargetFactory.createForFormat(targetTable, conf))); + + Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat = + conversionTargetByFormat.entrySet().stream() + .collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getTableMetadata())); + Map<String, ConversionTarget> formatsToSyncIncrementally = + getFormatsToSyncIncrementally( + syncMode, + conversionTargetByFormat, + lastSyncMetadataByFormat, + source.getConversionSource()); + Map<String, ConversionTarget> formatsToSyncBySnapshot = + conversionTargetByFormat.entrySet().stream() + .filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + SyncResultForTableFormats syncResultForSnapshotSync = + formatsToSyncBySnapshot.isEmpty() + ? SyncResultForTableFormats.builder().build() + : syncSnapshot(formatsToSyncBySnapshot, source); + SyncResultForTableFormats syncResultForIncrementalSync = + formatsToSyncIncrementally.isEmpty() + ? SyncResultForTableFormats.builder().build() + : syncIncrementalChanges(formatsToSyncIncrementally, lastSyncMetadataByFormat, source); + Map<String, SyncResult> syncResultsMerged = + new HashMap<>(syncResultForIncrementalSync.getLastSyncResult()); + syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult()); + String successfulSyncs = + getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.SUCCESS); + if (!successfulSyncs.isEmpty()) { + log.info("Sync is successful for the following formats {}", successfulSyncs); + } + String failedSyncs = + getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.ERROR); + if (!failedSyncs.isEmpty()) { + log.error("Sync failed for the following formats {}", failedSyncs); + } + return syncResultsMerged; + } + + /** + * Synchronizes the target table to multiple target catalogs. + * + * @param targetTable target table that needs to synced. + * @param catalogSyncClients Collection of catalog sync clients along with their table identifiers + * for each target catalog. + * @param conversionSourceProvider A provider for the {@link ConversionSource} instance for the + * table format of targetTable. + */ + private SyncResult syncCatalogsForTargetTable( + TargetTable targetTable, + Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients, + ConversionSourceProvider conversionSourceProvider) { + return catalogSync.syncTable( + catalogSyncClients, + // We get the latest state of InternalTable for TargetTable + // and then synchronize it to catalogSyncClients. + conversionSourceProvider + .getConversionSourceInstance(convertToSourceTable(targetTable)) + .getCurrentTable()); + } + private static String getFormatsWithStatusCode( Map<String, SyncResult> syncResultsMerged, SyncResult.SyncStatusCode statusCode) { return syncResultsMerged.entrySet().stream() @@ -149,11 +251,11 @@ public class ConversionController { } private <COMMIT> Map<String, ConversionTarget> getFormatsToSyncIncrementally( - ConversionConfig conversionConfig, + SyncMode syncMode, Map<String, ConversionTarget> conversionTargetByFormat, Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat, ConversionSource<COMMIT> conversionSource) { - if (conversionConfig.getSyncMode() == SyncMode.FULL) { + if (syncMode == SyncMode.FULL) { // Full sync requested by config, hence no incremental sync. return Collections.emptyMap(); } @@ -268,6 +370,22 @@ public class ConversionController { .build(); } + private void mergeSyncResults( + Map<String, SyncResult> syncResultsMerged, Map<String, SyncResult> catalogSyncResults) { + catalogSyncResults.forEach( + (tableFormat, catalogSyncResult) -> { + syncResultsMerged.computeIfPresent( + tableFormat, + (k, syncResult) -> + syncResult.toBuilder() + .syncDuration( + syncResult.getSyncDuration().plus(catalogSyncResult.getSyncDuration())) + .catalogSyncStatusList(catalogSyncResult.getCatalogSyncStatusList()) + .build()); + syncResultsMerged.computeIfAbsent(tableFormat, k -> catalogSyncResult); + }); + } + @Value @Builder private static class SyncResultForTableFormats { diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java similarity index 65% copy from xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java copy to xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java index d5d7a3c5..f21be670 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.xtable.iceberg; +package org.apache.xtable.conversion; -import java.util.Collections; -import java.util.Map; +public class ConversionUtils { -import lombok.Builder; -import lombok.NonNull; -import lombok.Value; - -import org.apache.xtable.conversion.CatalogConfig; - -@Value -@Builder -public class IcebergCatalogConfig implements CatalogConfig { - @NonNull String catalogImpl; - @NonNull String catalogName; - @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); + public static SourceTable convertToSourceTable(TargetTable table) { + return new SourceTable( + table.getName(), + table.getFormatName(), + table.getBasePath(), + table.getBasePath(), + table.getNamespace(), + table.getCatalogConfig(), + table.getAdditionalProperties()); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java index d5d7a3c5..e0ec7762 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java @@ -27,10 +27,16 @@ import lombok.Value; import org.apache.xtable.conversion.CatalogConfig; +/** + * Iceberg requires a catalog to perform any operation, if no catalog is provided the default + * catalog (HadoopCatalog or storage based catalog) is used. For syncing iceberg to multiple + * catalogs, you can use {@link org.apache.xtable.conversion.ExternalCatalogConfig} instead which + * allows syncing the latest version of iceberg metadata to multiple catalogs. + */ @Value @Builder public class IcebergCatalogConfig implements CatalogConfig { - @NonNull String catalogImpl; @NonNull String catalogName; + @NonNull String catalogImpl; @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java new file mode 100644 index 00000000..1d05666b --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.TargetCatalogConfig; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; +import org.apache.xtable.testutil.ITTestUtils; +import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl; +import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl; + +class TestCatalogConversionFactory { + + @Test + void createCatalogConversionSource() { + ExternalCatalogConfig sourceCatalog = + ExternalCatalogConfig.builder() + .catalogId("catalogId") + .catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName()) + .catalogProperties(Collections.emptyMap()) + .build(); + CatalogConversionSource catalogConversionSource = + CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, new Configuration()); + assertEquals( + catalogConversionSource.getClass().getName(), + TestCatalogConversionSourceImpl.class.getName()); + } + + @Test + void createCatalogConversionSourceForCatalogType() { + ExternalCatalogConfig sourceCatalog = + ExternalCatalogConfig.builder() + .catalogId("catalogId") + .catalogType(ITTestUtils.TEST_CATALOG_TYPE) + .catalogProperties(Collections.emptyMap()) + .build(); + CatalogConversionSource catalogConversionSource = + CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, new Configuration()); + assertEquals( + catalogConversionSource.getClass().getName(), + TestCatalogConversionSourceImpl.class.getName()); + } + + @Test + void createCatalogSyncClient() { + TargetCatalogConfig targetCatalogConfig = + TargetCatalogConfig.builder() + .catalogConfig( + ExternalCatalogConfig.builder() + .catalogId("catalogId") + .catalogSyncClientImpl(TestCatalogSyncImpl.class.getName()) + .catalogProperties(Collections.emptyMap()) + .build()) + .catalogTableIdentifier( + new ThreePartHierarchicalTableIdentifier("target-database", "target-tableName")) + .build(); + CatalogSyncClient catalogSyncClient = + CatalogConversionFactory.getInstance() + .createCatalogSyncClient( + targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new Configuration()); + assertEquals(catalogSyncClient.getClass().getName(), TestCatalogSyncImpl.class.getName()); + } + + @Test + void createCatalogSyncClientForCatalogType() { + TargetCatalogConfig targetCatalogConfig = + TargetCatalogConfig.builder() + .catalogConfig( + ExternalCatalogConfig.builder() + .catalogId("catalogId") + .catalogType(ITTestUtils.TEST_CATALOG_TYPE) + .catalogProperties(Collections.emptyMap()) + .build()) + .catalogTableIdentifier( + new ThreePartHierarchicalTableIdentifier("target-database", "target-tableName")) + .build(); + CatalogSyncClient catalogSyncClient = + CatalogConversionFactory.getInstance() + .createCatalogSyncClient( + targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new Configuration()); + assertEquals(catalogSyncClient.getClass().getName(), TestCatalogSyncImpl.class.getName()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index caba8046..0f34103e 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -18,8 +18,12 @@ package org.apache.xtable.conversion; +import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable; +import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; +import static org.apache.xtable.model.storage.TableFormat.ICEBERG; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -36,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -43,17 +48,23 @@ import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatcher; +import com.google.common.collect.ImmutableMap; + +import org.apache.xtable.catalog.CatalogConversionFactory; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.IncrementalTableChanges; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; import org.apache.xtable.spi.extractor.ConversionSource; +import org.apache.xtable.spi.sync.CatalogSync; +import org.apache.xtable.spi.sync.CatalogSyncClient; import org.apache.xtable.spi.sync.ConversionTarget; import org.apache.xtable.spi.sync.TableFormatSync; @@ -62,12 +73,22 @@ public class TestConversionController { private final Configuration mockConf = mock(Configuration.class); private final ConversionSourceProvider<Instant> mockConversionSourceProvider = mock(ConversionSourceProvider.class); + private final ConversionSourceProvider<Instant> mockConversionSourceProvider2 = + mock(ConversionSourceProvider.class); + private final ConversionSourceProvider<Instant> mockConversionSourceProvider3 = + mock(ConversionSourceProvider.class); + private final ConversionSource<Instant> mockConversionSource = mock(ConversionSource.class); private final ConversionTargetFactory mockConversionTargetFactory = mock(ConversionTargetFactory.class); + private final CatalogConversionFactory mockCatalogConversionFactory = + mock(CatalogConversionFactory.class); private final TableFormatSync tableFormatSync = mock(TableFormatSync.class); + private final CatalogSync catalogSync = mock(CatalogSync.class); private final ConversionTarget mockConversionTarget1 = mock(ConversionTarget.class); private final ConversionTarget mockConversionTarget2 = mock(ConversionTarget.class); + private final CatalogSyncClient mockCatalogSyncClient1 = mock(CatalogSyncClient.class); + private final CatalogSyncClient mockCatalogSyncClient2 = mock(CatalogSyncClient.class); @Test void testAllSnapshotSyncAsPerConfig() { @@ -96,7 +117,12 @@ public class TestConversionController { eq(internalSnapshot))) .thenReturn(perTableResults); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(perTableResults, result); @@ -182,7 +208,12 @@ public class TestConversionController { expectedSyncResult.put(TableFormat.ICEBERG, getLastSyncResult(icebergSyncResults)); expectedSyncResult.put(TableFormat.DELTA, getLastSyncResult(deltaSyncResults)); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); @@ -226,7 +257,12 @@ public class TestConversionController { eq(internalSnapshot))) .thenReturn(syncResults); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(syncResults, result); @@ -310,7 +346,12 @@ public class TestConversionController { expectedSyncResult.put(TableFormat.ICEBERG, syncResult); expectedSyncResult.put(TableFormat.DELTA, getLastSyncResult(deltaSyncResults)); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); @@ -368,16 +409,104 @@ public class TestConversionController { // Iceberg and Delta have no commits to sync Map<String, SyncResult> expectedSyncResult = Collections.emptyMap(); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); } + @Test + void testNoTableFormatConversionWithMultipleCatalogSync() { + SyncMode syncMode = SyncMode.INCREMENTAL; + List<TargetCatalogConfig> targetCatalogs = + Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2")); + InternalTable internalTable = getInternalTable(); + InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1"); + // Conversion source and target mocks. + ConversionConfig conversionConfig = + getTableSyncConfig( + Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode, targetCatalogs); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) + .thenReturn(mockConversionSource); + when(mockConversionSourceProvider.getConversionSourceInstance( + convertToSourceTable(conversionConfig.getTargetTables().get(0)))) + .thenReturn(mockConversionSource); + when(mockConversionSourceProvider.getConversionSourceInstance( + convertToSourceTable(conversionConfig.getTargetTables().get(1)))) + .thenReturn(mockConversionSource); + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) + .thenReturn(mockConversionTarget1); + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) + .thenReturn(mockConversionTarget2); + when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot); + when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable()); + // Mocks for tableFormatSync. + Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1)); + Instant syncStartTime = Instant.now(); + SyncResult syncResult = + buildSyncResult(syncMode, instantBeforeHour, syncStartTime, Duration.ofSeconds(1)); + Map<String, SyncResult> tableFormatSyncResults = + buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult); + when(tableFormatSync.syncSnapshot( + argThat(containsAll(Arrays.asList(mockConversionTarget1, mockConversionTarget2))), + eq(internalSnapshot))) + .thenReturn(tableFormatSyncResults); + // Mocks for catalogSync. + when(mockCatalogConversionFactory.createCatalogSyncClient( + eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf))) + .thenReturn(mockCatalogSyncClient1); + when(mockCatalogConversionFactory.createCatalogSyncClient( + eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf))) + .thenReturn(mockCatalogSyncClient2); + when(catalogSync.syncTable( + eq( + ImmutableMap.of( + targetCatalogs.get(0).getCatalogTableIdentifier(), mockCatalogSyncClient1, + targetCatalogs.get(1).getCatalogTableIdentifier(), mockCatalogSyncClient2)), + any())) + .thenReturn( + buildSyncResult(syncMode, syncStartTime, instantBeforeHour, Duration.ofSeconds(3))); + ConversionController conversionController = + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); + // Mocks for conversionSourceProviders. + Map<String, ConversionSourceProvider> conversionSourceProviders = new HashMap<>(); + conversionSourceProviders.put(HUDI, mockConversionSourceProvider); + conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider); + conversionSourceProviders.put(DELTA, mockConversionSourceProvider); + // Assert results. + Map<String, SyncResult> mergedSyncResults = + buildPerTableResult( + Arrays.asList(ICEBERG, DELTA), + syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build()); + Map<String, SyncResult> result = + conversionController.syncTableAcrossCatalogs(conversionConfig, conversionSourceProviders); + assertEquals(mergedSyncResults, result); + } + private SyncResult getLastSyncResult(List<SyncResult> syncResults) { return syncResults.get(syncResults.size() - 1); } + private Map<String, SyncResult> buildPerTableResult( + List<String> tableFormats, SyncResult syncResult) { + Map<String, SyncResult> perTableResults = new HashMap<>(); + tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat, syncResult)); + return perTableResults; + } + private List<SyncResult> buildSyncResults(List<Instant> instantList) { return instantList.stream() .map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant)) @@ -396,6 +525,17 @@ public class TestConversionController { .build(); } + private SyncResult buildSyncResult( + SyncMode syncMode, Instant syncStartTime, Instant lastSyncedInstant, Duration duration) { + return SyncResult.builder() + .mode(syncMode) + .lastInstantSynced(lastSyncedInstant) + .syncStartTime(syncStartTime) + .syncDuration(duration) + .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS) + .build(); + } + private InternalSnapshot buildSnapshot(InternalTable internalTable, String version) { return InternalSnapshot.builder().table(internalTable).version(version).build(); } @@ -413,6 +553,13 @@ public class TestConversionController { } private ConversionConfig getTableSyncConfig(List<String> targetTableFormats, SyncMode syncMode) { + return getTableSyncConfig(targetTableFormats, syncMode, Collections.emptyList()); + } + + private ConversionConfig getTableSyncConfig( + List<String> targetTableFormats, + SyncMode syncMode, + List<TargetCatalogConfig> targetCatalogs) { SourceTable sourceTable = SourceTable.builder() .name("tablename") @@ -434,10 +581,27 @@ public class TestConversionController { return ConversionConfig.builder() .sourceTable(sourceTable) .targetTables(targetTables) + .targetCatalogs( + targetTables.stream() + .collect(Collectors.toMap(Function.identity(), k -> targetCatalogs))) .syncMode(syncMode) .build(); } + private TargetCatalogConfig getTargetCatalog(String suffix) { + return TargetCatalogConfig.builder() + .catalogConfig( + ExternalCatalogConfig.builder() + .catalogId("catalogId-" + suffix) + .catalogSyncClientImpl("catalogImpl-" + suffix) + .catalogProperties(Collections.emptyMap()) + .build()) + .catalogTableIdentifier( + new ThreePartHierarchicalTableIdentifier( + "target-database" + suffix, "target-tableName" + suffix)) + .build(); + } + private static <T> ArgumentMatcher<Collection<T>> containsAll(Collection<T> expected) { return actual -> actual.size() == expected.size() && actual.containsAll(expected); } diff --git a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java index 281e61fe..ce374f39 100644 --- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java +++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java @@ -18,16 +18,25 @@ package org.apache.xtable.testutil; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; public class ITTestUtils { + public static final String TEST_CATALOG_TYPE = "test"; public static void validateTable( InternalTable internalTable, @@ -44,4 +53,103 @@ public class ITTestUtils { Assertions.assertEquals(basePath, internalTable.getBasePath()); Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); } + + public static class TestCatalogSyncImpl implements CatalogSyncClient { + private static final Map<String, Integer> FUNCTION_CALLS = new HashMap<>(); + + public TestCatalogSyncImpl( + ExternalCatalogConfig catalogConfig, String tableFormat, Configuration hadoopConf) {} + + public TestCatalogSyncImpl() {} + + @Override + public String getCatalogId() { + trackFunctionCall(); + return null; + } + + @Override + public String getCatalogType() { + return TEST_CATALOG_TYPE; + } + + @Override + public String getStorageLocation(Object o) { + trackFunctionCall(); + return null; + } + + @Override + public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + return false; + } + + @Override + public void createDatabase(CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + } + + @Override + public Object getTable(CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + return null; + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + } + + @Override + public void refreshTable( + InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + } + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + } + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + trackFunctionCall(); + } + + @Override + public void close() throws Exception { + trackFunctionCall(); + } + + private void trackFunctionCall() { + String methodName = Thread.currentThread().getStackTrace()[2].getMethodName(); + FUNCTION_CALLS.put(methodName, FUNCTION_CALLS.getOrDefault(methodName, 0) + 1); + } + + public static Map<String, Integer> getFunctionCalls() { + return FUNCTION_CALLS; + } + } + + public static class TestCatalogConversionSourceImpl implements CatalogConversionSource { + public TestCatalogConversionSourceImpl( + ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {} + + public TestCatalogConversionSourceImpl() {} + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + return SourceTable.builder() + .name("source_table_name") + .basePath("file://base_path/v1/") + .formatName("ICEBERG") + .build(); + } + + @Override + public String getCatalogType() { + return TEST_CATALOG_TYPE; + } + } } diff --git a/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource new file mode 100644 index 00000000..ceb8a2ff --- /dev/null +++ b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource @@ -0,0 +1,18 @@ +########################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################## +org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl \ No newline at end of file diff --git a/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient new file mode 100644 index 00000000..4e571e3e --- /dev/null +++ b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient @@ -0,0 +1,18 @@ +########################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +########################################################################## +org.apache.xtable.testutil.ITTestUtils$TestCatalogSyncImpl \ No newline at end of file diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index bc91f99e..979cb456 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -35,6 +35,15 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.xtable</groupId> + <artifactId>xtable-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- command line arg parsing --> <dependency> <groupId>commons-cli</groupId> @@ -125,6 +134,20 @@ <artifactId>junit-jupiter-engine</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.xtable</groupId> + <artifactId>xtable-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java new file mode 100644 index 00000000..60a62cd9 --- /dev/null +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import static org.apache.xtable.utilities.RunSync.getCustomConfigurations; +import static org.apache.xtable.utilities.RunSync.loadHadoopConf; +import static org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import org.apache.xtable.catalog.CatalogConversionFactory; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetCatalogConfig; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiSourceConfig; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.reflection.ReflectionUtils; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier; +import org.apache.xtable.utilities.RunSync.TableFormatConverters; + +/** + * Provides standalone process for reading tables from a source catalog and synchronizing their + * state in target tables, supports table format conversion as well if the target table chooses a + * different format from source table. + */ +@Log4j2 +public class RunCatalogSync { + public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); + private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = "catalogConfig"; + private static final String HADOOP_CONFIG_PATH = "hadoopConfig"; + private static final String CONVERTERS_CONFIG_PATH = "convertersConfig"; + private static final String HELP_OPTION = "h"; + private static final Map<String, ConversionSourceProvider> CONVERSION_SOURCE_PROVIDERS = + new HashMap<>(); + + private static final Options OPTIONS = + new Options() + .addRequiredOption( + CATALOG_SOURCE_AND_TARGET_CONFIG_PATH, + "catalogSyncConfig", + true, + "The path to a yaml file containing source and target tables catalog configurations along with the table identifiers that need to synced") + .addOption( + HADOOP_CONFIG_PATH, + "hadoopConfig", + true, + "Hadoop config xml file path containing configs necessary to access the " + + "file system. These configs will override the default configs.") + .addOption( + CONVERTERS_CONFIG_PATH, + "convertersConfig", + true, + "The path to a yaml file containing InternalTable converter configurations. " + + "These configs will override the default") + .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); + + public static void main(String[] args) throws Exception { + CommandLineParser parser = new DefaultParser(); + CommandLine cmd; + try { + cmd = parser.parse(OPTIONS, args); + } catch (ParseException e) { + new HelpFormatter().printHelp("xtable.jar", OPTIONS, true); + return; + } + + if (cmd.hasOption(HELP_OPTION)) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("RunCatalogSync", OPTIONS); + return; + } + + DatasetConfig datasetConfig; + try (InputStream inputStream = + Files.newInputStream( + Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) { + datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class); + } + + byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH); + Configuration hadoopConf = loadHadoopConf(customConfig); + + customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH); + TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig); + + Map<String, ExternalCatalogConfig> catalogsById = + datasetConfig.getTargetCatalogs().stream() + .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity())); + Optional<CatalogConversionSource> catalogConversionSource = + getCatalogConversionSource(datasetConfig.getSourceCatalog(), hadoopConf); + ConversionController conversionController = new ConversionController(hadoopConf); + for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) { + SourceTable sourceTable = + getSourceTable(dataset.getSourceCatalogTableIdentifier(), catalogConversionSource); + List<TargetTable> targetTables = new ArrayList<>(); + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>(); + for (TargetTableIdentifier targetCatalogTableIdentifier : + dataset.getTargetCatalogTableIdentifiers()) { + TargetTable targetTable = + TargetTable.builder() + .name(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) + .namespace(sourceTable.getNamespace()) + .formatName(targetCatalogTableIdentifier.getTableFormat()) + .build(); + targetTables.add(targetTable); + if (!targetCatalogs.containsKey(targetTable)) { + targetCatalogs.put(targetTable, new ArrayList<>()); + } + targetCatalogs + .get(targetTable) + .add( + TargetCatalogConfig.builder() + .catalogTableIdentifier( + getCatalogTableIdentifier( + targetCatalogTableIdentifier.getTableIdentifier())) + .catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId())) + .build()); + } + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .targetCatalogs(targetCatalogs) + .syncMode(SyncMode.INCREMENTAL) + .build(); + List<String> tableFormats = + Stream.concat( + Stream.of(sourceTable.getFormatName()), + targetTables.stream().map(TargetTable::getFormatName)) + .distinct() + .collect(Collectors.toList()); + try { + conversionController.syncTableAcrossCatalogs( + conversionConfig, + getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf)); + } catch (Exception e) { + log.error("Error running sync for {}", sourceTable.getBasePath(), e); + } + } + } + + static Optional<CatalogConversionSource> getCatalogConversionSource( + ExternalCatalogConfig sourceCatalog, Configuration hadoopConf) { + if (CatalogType.STORAGE.equals(sourceCatalog.getCatalogType())) { + return Optional.empty(); + } + return Optional.of( + CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, hadoopConf)); + } + + static SourceTable getSourceTable( + DatasetConfig.SourceTableIdentifier sourceTableIdentifier, + Optional<CatalogConversionSource> catalogConversionSource) { + SourceTable sourceTable = null; + if (sourceTableIdentifier.getStorageIdentifier() != null) { + StorageIdentifier storageIdentifier = sourceTableIdentifier.getStorageIdentifier(); + Properties sourceProperties = new Properties(); + if (storageIdentifier.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, storageIdentifier.getPartitionSpec()); + } + sourceTable = + SourceTable.builder() + .name(storageIdentifier.getTableName()) + .basePath(storageIdentifier.getTableBasePath()) + .namespace( + storageIdentifier.getNamespace() == null + ? null + : storageIdentifier.getNamespace().split("\\.")) + .dataPath(storageIdentifier.getTableDataPath()) + .formatName(storageIdentifier.getTableFormat()) + .additionalProperties(sourceProperties) + .build(); + } else if (catalogConversionSource.isPresent()) { + sourceTable = + catalogConversionSource + .get() + .getSourceTable( + getCatalogTableIdentifier(sourceTableIdentifier.getTableIdentifier())); + } + return sourceTable; + } + + static Map<String, ConversionSourceProvider> getConversionSourceProviders( + List<String> tableFormats, + TableFormatConverters tableFormatConverters, + Configuration hadoopConf) { + for (String tableFormat : tableFormats) { + if (CONVERSION_SOURCE_PROVIDERS.containsKey(tableFormat)) { + continue; + } + TableFormatConverters.ConversionConfig sourceConversionConfig = + tableFormatConverters.getTableFormatConverters().get(tableFormat); + if (sourceConversionConfig == null) { + throw new IllegalArgumentException( + String.format( + "Source format %s is not supported. Known source and target formats are %s", + tableFormat, tableFormatConverters.getTableFormatConverters().keySet())); + } + String sourceProviderClass = sourceConversionConfig.conversionSourceProviderClass; + ConversionSourceProvider<?> conversionSourceProvider = + ReflectionUtils.createInstanceOfClass(sourceProviderClass); + conversionSourceProvider.init(hadoopConf); + CONVERSION_SOURCE_PROVIDERS.put(tableFormat, conversionSourceProvider); + } + return CONVERSION_SOURCE_PROVIDERS; + } + + /** + * Returns an implementation class for {@link CatalogTableIdentifier} based on the tableIdentifier + * provided by user. + */ + static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier tableIdentifier) { + if (tableIdentifier.getHierarchicalId() != null) { + return ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + tableIdentifier.getHierarchicalId()); + } + throw new IllegalArgumentException("Invalid tableIdentifier configuration provided"); + } + + @Value + @Builder + @Jacksonized + public static class DatasetConfig { + /** + * Configuration of the source catalog from which XTable will read. It must contain all the + * necessary connection and access details for describing and listing tables + */ + ExternalCatalogConfig sourceCatalog; + /** + * Defines configuration one or more target catalogs, to which XTable will write or update + * tables. Unlike the source, these catalogs must be writable + */ + List<ExternalCatalogConfig> targetCatalogs; + /** A list of datasets that specify how a source table maps to one or more target tables. */ + List<Dataset> datasets; + + /** Configuration for catalog. */ + ExternalCatalogConfig catalogConfig; + + @Value + @Builder + @Jacksonized + public static class Dataset { + /** Identifies the source table in sourceCatalog. */ + SourceTableIdentifier sourceCatalogTableIdentifier; + /** A list of one or more targets that this source table should be written to. */ + List<TargetTableIdentifier> targetCatalogTableIdentifiers; + } + + @Value + @Builder + @Jacksonized + public static class SourceTableIdentifier { + /** Specifies the table identifier in the source catalog. */ + TableIdentifier tableIdentifier; + /** + * (Optional) Provides direct storage details such as a table’s base path (like an S3 + * location) and the partition specification. This allows reading from a source even if it is + * not strictly registered in a catalog, as long as the format and location are known + */ + StorageIdentifier storageIdentifier; + } + + @Value + @Builder + @Jacksonized + public static class TargetTableIdentifier { + /** + * The user defined unique identifier of the target catalog where the table will be created or + * updated + */ + String catalogId; + /** + * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be + * stored at the target. + */ + String tableFormat; + /** Specifies the table identifier in the target catalog. */ + TableIdentifier tableIdentifier; + } + + @Value + @Builder + @Jacksonized + public static class TableIdentifier { + /** + * Specifics the three level hierarchical table identifier for {@link + * HierarchicalTableIdentifier} + */ + String hierarchicalId; + } + + /** + * Configuration in storage for table. This is an optional field in {@link + * SourceTableIdentifier}. + */ + @Value + @Builder + @Jacksonized + public static class StorageIdentifier { + String tableFormat; + String tableBasePath; + String tableDataPath; + String tableName; + String partitionSpec; + String namespace; + } + } +} diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index c84753de..1a7bda87 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -190,12 +190,12 @@ public class RunSync { try { conversionController.sync(conversionConfig, conversionSourceProvider); } catch (Exception e) { - log.error(String.format("Error running sync for %s", table.getTableBasePath()), e); + log.error("Error running sync for {}", table.getTableBasePath(), e); } } } - private static byte[] getCustomConfigurations(CommandLine cmd, String option) throws IOException { + static byte[] getCustomConfigurations(CommandLine cmd, String option) throws IOException { byte[] customConfig = null; if (cmd.hasOption(option)) { customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option))); diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java new file mode 100644 index 00000000..52cff85a --- /dev/null +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import lombok.SneakyThrows; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.hudi.common.model.HoodieTableType; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.TestJavaHudiTable; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.testutil.ITTestUtils; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.SourceTableIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier; + +public class ITRunCatalogSync { + + private static final List<String> EXPECTED_FUNCTION_CALLS = + Arrays.asList( + "hasDatabase", + "createDatabase", + "getTable", + "getStorageLocation", + "createTable", + "getCatalogId"); + + @Test + void testCatalogSync(@TempDir Path tempDir) throws Exception { + String tableName = "test-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + table.insertRows(20); + File configFile = writeConfigFile(tempDir, table, tableName); + String[] args = new String[] {"-catalogConfig", configFile.getPath()}; + RunCatalogSync.main(args); + validateTargetMetadataIsPresent(table.getBasePath()); + Map<String, Integer> functionCalls = ITTestUtils.TestCatalogSyncImpl.getFunctionCalls(); + EXPECTED_FUNCTION_CALLS.forEach( + (function -> Assertions.assertEquals(2, functionCalls.get(function)))); + } + } + + private static File writeConfigFile(Path tempDir, GenericTable table, String tableName) + throws IOException { + DatasetConfig config = + DatasetConfig.builder() + .sourceCatalog( + ExternalCatalogConfig.builder() + .catalogId("source-catalog-1") + .catalogType(CatalogType.STORAGE) + .build()) + .targetCatalogs( + Collections.singletonList( + ExternalCatalogConfig.builder() + .catalogId("target-catalog-1") + .catalogSyncClientImpl(ITTestUtils.TestCatalogSyncImpl.class.getName()) + .build())) + .datasets( + Collections.singletonList( + DatasetConfig.Dataset.builder() + .sourceCatalogTableIdentifier( + SourceTableIdentifier.builder() + .storageIdentifier( + StorageIdentifier.builder() + .tableBasePath(table.getBasePath()) + .tableName(tableName) + .tableFormat("HUDI") + .build()) + .build()) + .targetCatalogTableIdentifiers( + Arrays.asList( + TargetTableIdentifier.builder() + .catalogId("target-catalog-1") + .tableFormat("DELTA") + .tableIdentifier( + TableIdentifier.builder() + .hierarchicalId("database-1.table-1") + .build()) + .build(), + TargetTableIdentifier.builder() + .catalogId("target-catalog-1") + .tableFormat("ICEBERG") + .tableIdentifier( + TableIdentifier.builder() + .hierarchicalId("catalog-2.database-2.table-2") + .build()) + .build())) + .build())) + .build(); + File configFile = new File(tempDir + "config.yaml"); + RunSync.YAML_MAPPER.writeValue(configFile, config); + return configFile; + } + + @SneakyThrows + private void validateTargetMetadataIsPresent(String basePath) { + Path icebergMetadataPath = Paths.get(URI.create(basePath + "/metadata")); + long icebergMetadataFiles = + Files.list(icebergMetadataPath).filter(p -> p.toString().endsWith("metadata.json")).count(); + Assertions.assertEquals(2, icebergMetadataFiles); + Path deltaMetadataPath = Paths.get(URI.create(basePath + "/_delta_log")); + long deltaMetadataFiles = + Files.list(deltaMetadataPath).filter(p -> p.toString().endsWith(".json")).count(); + Assertions.assertEquals(1, deltaMetadataFiles); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java similarity index 60% copy from xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java copy to xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java index d5d7a3c5..3f504ff7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java @@ -16,21 +16,23 @@ * limitations under the License. */ -package org.apache.xtable.iceberg; +package org.apache.xtable.utilities; -import java.util.Collections; -import java.util.Map; +import static org.junit.jupiter.api.Assertions.*; -import lombok.Builder; -import lombok.NonNull; -import lombok.Value; +import lombok.SneakyThrows; -import org.apache.xtable.conversion.CatalogConfig; +import org.junit.jupiter.api.Test; -@Value -@Builder -public class IcebergCatalogConfig implements CatalogConfig { - @NonNull String catalogImpl; - @NonNull String catalogName; - @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); +class TestRunCatalogSync { + + @SneakyThrows + @Test + void testMain() { + String catalogConfigYamlPath = + TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath(); + String[] args = {"-catalogConfig", catalogConfigYamlPath}; + // Ensure yaml gets parsed without any errors. + assertDoesNotThrow(() -> RunCatalogSync.main(args)); + } } diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml b/xtable-utilities/src/test/resources/catalogConfig.yaml new file mode 100644 index 00000000..05b2df4b --- /dev/null +++ b/xtable-utilities/src/test/resources/catalogConfig.yaml @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +sourceCatalog: + catalogId: "source-1" + catalogConversionSourceImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogProperties: + key01: "value1" + key02: "value2" + key03: "value3" +targetCatalogs: + - catalogId: "target-1" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogProperties: + key11: "value1" + key12: "value2" + key13: "value3" + - catalogId: "target-2" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogProperties: + key21: "value1" + key22: "value2" + key23: "value3" + - catalogId: "target-3" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogProperties: + key31: "value1" + key32: "value2" + key33: "value3" +datasets: + - sourceCatalogTableIdentifier: + tableIdentifier: + hierarchicalId: "source-database-1.source-1" + targetCatalogTableIdentifiers: + - catalogId: "target-1" + tableFormat: "DELTA" + tableIdentifier: + hierarchicalId: "target-database-1.target-tableName-1" + - catalogId: "target-2" + tableFormat: "HUDI" + tableIdentifier: + hierarchicalId: "target-database-2.target-tableName-2-delta" + - sourceCatalogTableIdentifier: + storageIdentifier: + tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales + tableName: catalog_sales + partitionSpec: cs_sold_date_sk:VALUE + tableFormat: "HUDI" + targetCatalogTableIdentifiers: + - catalogId: "target-2" + tableFormat: "ICEBERG" + tableIdentifier: + hierarchicalId: "target-database-2.target-tableName-2" + - catalogId: "target-3" + tableFormat: "HUDI" + tableIdentifier: + hierarchicalId: "default-catalog-2.target-database-3.target-tableName-3"
