This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch 590-CatalogSync-CoreLibrary in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit 96daddef834bf8869c057f3799a1ecf1c3c5e49d Author: Vinish Reddy <[email protected]> AuthorDate: Tue Dec 10 00:25:56 2024 -0800 Add syncCatalogs in ConversionController --- .../apache/xtable/conversion/ConversionConfig.java | 9 +++- .../org/apache/xtable/model/sync/SyncResult.java | 2 +- .../xtable/spi/extractor/ConversionSource.java | 7 +++ .../org/apache/xtable/spi/sync/CatalogSync.java | 6 +-- .../xtable/catalog/ExternalCatalogConfig.java | 34 ++++++--------- .../conversion/CatalogSyncClientFactory.java | 51 ++++++++++++++++++++++ .../xtable/conversion/ConversionController.java | 51 +++++++++++++++++++++- .../apache/xtable/delta/DeltaConversionSource.java | 7 +++ .../apache/xtable/hudi/HudiConversionSource.java | 13 ++++++ .../xtable/iceberg/IcebergConversionSource.java | 7 +++ .../conversion/TestConversionController.java | 39 ++++++++++++++--- .../java/org/apache/xtable/utilities/RunSync.java | 48 ++++++++++++++++++++ 12 files changed, 242 insertions(+), 32 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 73e2628d..cab6bf1e 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -18,6 +18,7 @@ package org.apache.xtable.conversion; +import java.util.Collections; import java.util.List; import lombok.Builder; @@ -36,15 +37,21 @@ public class ConversionConfig { List<TargetTable> targetTables; // The mode, incremental or snapshot SyncMode syncMode; + // One or more external catalogs to sync the table metadata to + List<ExternalCatalog> externalCatalogs; @Builder ConversionConfig( - @NonNull SourceTable sourceTable, List<TargetTable> targetTables, SyncMode syncMode) { + @NonNull SourceTable sourceTable, + List<TargetTable> targetTables, + SyncMode syncMode, + List<ExternalCatalog> externalCatalogs) { this.sourceTable = sourceTable; this.targetTables = targetTables; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; + this.externalCatalogs = externalCatalogs == null ? Collections.emptyList() : externalCatalogs; } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java index ef06b9ca..1d7ee8db 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java @@ -31,7 +31,7 @@ import lombok.Value; * @since 0.1 */ @Value -@Builder +@Builder(toBuilder = true) public class SyncResult { // Mode used for the sync SyncMode mode; diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java index 2500454c..6baef45d 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java @@ -48,6 +48,13 @@ public interface ConversionSource<COMMIT> extends Closeable { */ InternalSnapshot getCurrentSnapshot(); + /** + * Extracts the {@link InternalTable} as of latest state. + * + * @return {@link InternalTable} representing the current state. + */ + InternalTable getCurrentTable(); + /** * Extracts a {@link TableChange} for the provided commit. * diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java index 1b4dcb0c..601068f9 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java @@ -46,10 +46,10 @@ public class CatalogSync { return INSTANCE; } - public <TABLE> Map<String, List<CatalogSyncStatus>> syncTable( - Collection<CatalogSyncClient<TABLE>> catalogSyncClients, InternalTable table) { + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncClient> catalogSyncClients, InternalTable table) { Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); - for (CatalogSyncClient<TABLE> catalogSyncClient : catalogSyncClients) { + for (CatalogSyncClient<?> catalogSyncClient : catalogSyncClients) { results.computeIfAbsent(catalogSyncClient.getTableFormat(), k -> new ArrayList<>()); try { results diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java similarity index 52% copy from xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java copy to xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java index 73e2628d..410a6e38 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfig.java @@ -16,35 +16,27 @@ * limitations under the License. */ -package org.apache.xtable.conversion; +package org.apache.xtable.catalog; -import java.util.List; +import java.util.Collections; +import java.util.Map; import lombok.Builder; import lombok.NonNull; import lombok.Value; -import com.google.common.base.Preconditions; - -import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.conversion.CatalogConfig; @Value -public class ConversionConfig { - // The source of the sync - @NonNull SourceTable sourceTable; - // One or more targets to sync the table metadata to - List<TargetTable> targetTables; - // The mode, incremental or snapshot - SyncMode syncMode; +@Builder +public class ExternalCatalogConfig implements CatalogConfig { + @NonNull String catalogImpl; + @NonNull String catalogName; + @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); - @Builder - ConversionConfig( - @NonNull SourceTable sourceTable, List<TargetTable> targetTables, SyncMode syncMode) { - this.sourceTable = sourceTable; - this.targetTables = targetTables; - Preconditions.checkArgument( - targetTables != null && !targetTables.isEmpty(), - "Please provide at-least one format to sync"); - this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; + public static ExternalCatalogConfig fromCatalogType( + String catalogType, String catalogName, Map<String, String> catalogOptions) { + // TODO: Return ExternalCatalogConfig based on catalogType - HMS, Glue etc. + return ExternalCatalogConfig.builder().build(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/CatalogSyncClientFactory.java b/xtable-core/src/main/java/org/apache/xtable/conversion/CatalogSyncClientFactory.java new file mode 100644 index 00000000..249613f0 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/CatalogSyncClientFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.xtable.reflection.ReflectionUtils; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +public class CatalogSyncClientFactory { + private static final CatalogSyncClientFactory INSTANCE = new CatalogSyncClientFactory(); + + public static CatalogSyncClientFactory getInstance() { + return INSTANCE; + } + + public List<CatalogSyncClient<?>> createForCatalog( + ExternalCatalog externalCatalog, Configuration configuration) { + return externalCatalog.getTableFormatsToSync().keySet().stream() + .map(tableFormat -> createForTableFormat(externalCatalog, tableFormat, configuration)) + .collect(Collectors.toList()); + } + + private CatalogSyncClient<?> createForTableFormat( + ExternalCatalog externalCatalog, String tableFormat, Configuration configuration) { + return ReflectionUtils.createInstanceOfClass( + externalCatalog.getCatalogConfig().getCatalogImpl(), + externalCatalog, + tableFormat, + configuration); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index dc665969..0d4c215b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -44,8 +44,11 @@ import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.ExtractFromSource; +import org.apache.xtable.spi.sync.CatalogSync; +import org.apache.xtable.spi.sync.CatalogSyncClient; import org.apache.xtable.spi.sync.ConversionTarget; import org.apache.xtable.spi.sync.TableFormatSync; @@ -64,10 +67,17 @@ import org.apache.xtable.spi.sync.TableFormatSync; public class ConversionController { private final Configuration conf; private final ConversionTargetFactory conversionTargetFactory; + private final CatalogSyncClientFactory catalogSyncClientFactory; private final TableFormatSync tableFormatSync; + private final CatalogSync catalogSync; public ConversionController(Configuration conf) { - this(conf, ConversionTargetFactory.getInstance(), TableFormatSync.getInstance()); + this( + conf, + ConversionTargetFactory.getInstance(), + CatalogSyncClientFactory.getInstance(), + TableFormatSync.getInstance(), + CatalogSync.getInstance()); } /** @@ -134,6 +144,10 @@ public class ConversionController { if (!failedSyncs.isEmpty()) { log.error("Sync failed for the following formats {}", failedSyncs); } + // Sync to catalogs. + Map<String, List<CatalogSyncStatus>> catalogSyncResults = + syncCatalogs(config, conversionSourceProvider); + mergeSyncResults(syncResultsMerged, catalogSyncResults); return syncResultsMerged; } catch (IOException ioException) { throw new ReadException("Failed to close source converter", ioException); @@ -211,6 +225,22 @@ public class ConversionController { return SyncResultForTableFormats.builder().lastSyncResult(syncResultsByFormat).build(); } + private <COMMIT> Map<String, List<CatalogSyncStatus>> syncCatalogs( + ConversionConfig config, ConversionSourceProvider<COMMIT> conversionSourceProvider) { + List<CatalogSyncClient> catalogSyncClients = + config.getExternalCatalogs().stream() + .flatMap( + externalCatalog -> + catalogSyncClientFactory.createForCatalog(externalCatalog, conf).stream()) + .collect(Collectors.toList()); + try (ConversionSource<COMMIT> conversionSource = + conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { + return catalogSync.syncTable(catalogSyncClients, conversionSource.getCurrentTable()); + } catch (IOException e) { + throw new ReadException("Failed to close source converter during syncCatalogs", e); + } + } + /** * Checks if incremental sync is sufficient for a target table format. * @@ -268,6 +298,25 @@ public class ConversionController { .build(); } + private void mergeSyncResults( + Map<String, SyncResult> syncResultsMerged, + Map<String, List<CatalogSyncStatus>> catalogSyncResults) { + catalogSyncResults.forEach( + (tableFormat, results) -> { + syncResultsMerged.computeIfPresent( + tableFormat, + (k, syncResult) -> syncResult.toBuilder().catalogSyncStatusList(results).build()); + syncResultsMerged.computeIfAbsent( + tableFormat, + k -> + SyncResult.builder() + .mode(SyncMode.INCREMENTAL) + .status(SyncResult.SyncStatus.SUCCESS) + .catalogSyncStatusList(results) + .build()); + }); + } + @Value @Builder private static class SyncResultForTableFormats { diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 19ecc02c..28e9ef2c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -92,6 +92,13 @@ public class DeltaConversionSource implements ConversionSource<Long> { .build(); } + @Override + public InternalTable getCurrentTable() { + DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath); + Snapshot snapshot = deltaLog.snapshot(); + return getTable(snapshot.version()); + } + @Override public TableChange getTableChangeForCommit(Long versionNumber) { InternalTable tableAtVersion = tableExtractor.table(deltaLog, tableName, versionNumber); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 1b1d0bf3..2993776c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -104,6 +104,19 @@ public class HudiConversionSource implements ConversionSource<HoodieInstant> { .build(); } + @Override + public InternalTable getCurrentTable() { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline completedTimeline = activeTimeline.filterCompletedInstants(); + // get latest commit + HoodieInstant latestCommit = + completedTimeline + .lastInstant() + .orElseThrow( + () -> new ReadException("Unable to read latest commit from Hudi source table")); + return getTable(latestCommit); + } + @Override public TableChange getTableChangeForCommit(HoodieInstant hoodieInstantForDiff) { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index f96ec714..2d049d55 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -160,6 +160,13 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { .build(); } + @Override + public InternalTable getCurrentTable() { + Table iceTable = getSourceTable(); + Snapshot currentSnapshot = iceTable.currentSnapshot(); + return getTable(currentSnapshot); + } + private InternalDataFile fromIceberg( DataFile file, PartitionSpec partitionSpec, InternalTable internalTable) { List<PartitionValue> partitionValues = diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index 652bbe42..e1bcb99a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -54,6 +54,7 @@ import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; import org.apache.xtable.spi.extractor.ConversionSource; +import org.apache.xtable.spi.sync.CatalogSync; import org.apache.xtable.spi.sync.ConversionTarget; import org.apache.xtable.spi.sync.TableFormatSync; @@ -65,7 +66,10 @@ public class TestConversionController { private final ConversionSource<Instant> mockConversionSource = mock(ConversionSource.class); private final ConversionTargetFactory mockConversionTargetFactory = mock(ConversionTargetFactory.class); + private final CatalogSyncClientFactory mockCatalogSyncClientFactory = + mock(CatalogSyncClientFactory.class); private final TableFormatSync tableFormatSync = mock(TableFormatSync.class); + private final CatalogSync catalogSync = mock(CatalogSync.class); private final ConversionTarget mockConversionTarget1 = mock(ConversionTarget.class); private final ConversionTarget mockConversionTarget2 = mock(ConversionTarget.class); @@ -96,7 +100,12 @@ public class TestConversionController { eq(internalSnapshot))) .thenReturn(perTableResults); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogSyncClientFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(perTableResults, result); @@ -182,7 +191,12 @@ public class TestConversionController { expectedSyncResult.put(TableFormat.ICEBERG, getLastSyncResult(icebergSyncResults)); expectedSyncResult.put(TableFormat.DELTA, getLastSyncResult(deltaSyncResults)); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogSyncClientFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); @@ -226,7 +240,12 @@ public class TestConversionController { eq(internalSnapshot))) .thenReturn(syncResults); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogSyncClientFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(syncResults, result); @@ -310,7 +329,12 @@ public class TestConversionController { expectedSyncResult.put(TableFormat.ICEBERG, syncResult); expectedSyncResult.put(TableFormat.DELTA, getLastSyncResult(deltaSyncResults)); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogSyncClientFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); @@ -368,7 +392,12 @@ public class TestConversionController { // Iceberg and Delta have no commits to sync Map<String, SyncResult> expectedSyncResult = Collections.emptyMap(); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogSyncClientFactory, + tableFormatSync, + catalogSync); Map<String, SyncResult> result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index c84753de..b3a352e8 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -37,6 +37,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import com.fasterxml.jackson.annotation.JsonMerge; @@ -46,9 +47,12 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.annotations.VisibleForTesting; +import org.apache.xtable.catalog.ExternalCatalogConfig; import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier; import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiSourceConfig; @@ -69,6 +73,7 @@ public class RunSync { private static final String HADOOP_CONFIG_PATH = "p"; private static final String CONVERTERS_CONFIG_PATH = "c"; private static final String ICEBERG_CATALOG_CONFIG_PATH = "i"; + private static final String EXTERNAL_CATALOG_CONFIG_PATH = "e"; private static final String HELP_OPTION = "h"; private static final Options OPTIONS = @@ -181,11 +186,38 @@ public class RunSync { .build()) .collect(Collectors.toList()); + List<ExternalCatalog> externalCatalogs = + datasetConfig.getCatalogs().stream() + .map( + catalog -> { + ExternalCatalog.ExternalCatalogBuilder builder = + ExternalCatalog.builder() + .catalogIdentifier(catalog.getCatalogIdentifier()) + .tableFormatsToSync(catalog.getCatalogTableIdentifiers()); + if (StringUtils.isEmpty(catalog.getCatalogImplClass())) { + builder.catalogConfig( + ExternalCatalogConfig.builder() + .catalogImpl(catalog.getCatalogImplClass()) + .catalogOptions(catalog.getCatalogProperties()) + .catalogName(catalog.getCatalogIdentifier()) + .build()); + } else { + builder.catalogConfig( + ExternalCatalogConfig.fromCatalogType( + catalog.getCatalogType(), + catalog.getCatalogIdentifier(), + catalog.getCatalogProperties())); + } + return builder.build(); + }) + .collect(Collectors.toList()); + ConversionConfig conversionConfig = ConversionConfig.builder() .sourceTable(sourceTable) .targetTables(targetTables) .syncMode(SyncMode.INCREMENTAL) + .externalCatalogs(externalCatalogs) .build(); try { conversionController.sync(conversionConfig, conversionSourceProvider); @@ -258,6 +290,9 @@ public class RunSync { /** Configuration of the dataset to sync, path, table name, etc. */ List<Table> datasets; + /** List of catalogs to sync */ + List<Catalog> catalogs; + @Data public static class Table { /** @@ -272,6 +307,19 @@ public class RunSync { String partitionSpec; String namespace; } + + @Data + public static class Catalog { + String catalogIdentifier; + + String catalogImplClass; + + String catalogType; + + Map<String, String> catalogProperties; + + Map<String, TableIdentifier> catalogTableIdentifiers; + } } @Data
