This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch 590-CatalogSync-API in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit a49b3e6fd1515fe1090cd5ad93fed94da486f875 Author: Vinish Reddy <[email protected]> AuthorDate: Wed Dec 18 00:10:02 2024 -0800 [590] Add interface for CatalogSyncClient and CatalogSync --- pom.xml | 15 ++- xtable-api/pom.xml | 4 + .../apache/xtable/conversion/ConversionConfig.java | 10 +- ...SourceTable.java => ExternalCatalogConfig.java} | 36 +++--- .../org/apache/xtable/conversion/SourceTable.java | 2 +- .../TargetCatalogConfig.java} | 36 +++--- .../org/apache/xtable/conversion/TargetTable.java | 4 + .../catalog/CatalogTableIdentifier.java} | 40 +++---- ...ErrorCode.java => CatalogRefreshException.java} | 23 ++-- .../apache/xtable/model/exception/ErrorCode.java | 3 +- .../org/apache/xtable/model/sync/SyncResult.java | 28 ++++- .../extractor/CatalogConversionSource.java} | 31 ++--- .../xtable/spi/extractor/ConversionSource.java | 9 ++ .../org/apache/xtable/spi/sync/CatalogSync.java | 129 +++++++++++++++++++++ .../apache/xtable/spi/sync/CatalogSyncClient.java | 71 ++++++++++++ .../org/apache/xtable/spi/sync/CatalogUtils.java | 63 ++++++++++ .../apache/xtable/spi/sync/TableFormatSync.java | 13 ++- .../apache/xtable/spi/sync/TestCatalogSync.java | 127 ++++++++++++++++++++ .../apache/xtable/spi/sync/TestCatalogUtils.java | 72 ++++++++++++ .../xtable/spi/sync/TestTableFormatSync.java | 38 +++--- 20 files changed, 635 insertions(+), 119 deletions(-) diff --git a/pom.xml b/pom.xml index 99b4fe1a..7a597342 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ <maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version> <maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version> <maven-release-plugin.version>2.5.3</maven-release-plugin.version> + <mockito.version>5.2.0</mockito.version> <parquet.version>1.12.2</parquet.version> <protobuf.version>3.25.5</protobuf.version> <scala12.version>2.12.20</scala12.version> @@ -438,7 +439,19 @@ <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> - <version>4.8.0</version> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + <version>${mockito.version}</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>${mockito.version}</version> <scope>test</scope> </dependency> diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml index 9436b775..43aa7bac 100644 --- a/xtable-api/pom.xml +++ b/xtable-api/pom.xml @@ -84,5 +84,9 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + </dependency> </dependencies> </project> diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 73e2628d..8ef52741 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -19,6 +19,7 @@ package org.apache.xtable.conversion; import java.util.List; +import java.util.Map; import lombok.Builder; import lombok.NonNull; @@ -34,14 +35,21 @@ public class ConversionConfig { @NonNull SourceTable sourceTable; // One or more targets to sync the table metadata to List<TargetTable> targetTables; + // Each target table can be synced to multiple target catalogs, this is map from + // targetTableIdentifier to target catalogs. + Map<String, List<TargetCatalogConfig>> targetCatalogs; // The mode, incremental or snapshot SyncMode syncMode; @Builder ConversionConfig( - @NonNull SourceTable sourceTable, List<TargetTable> targetTables, SyncMode syncMode) { + @NonNull SourceTable sourceTable, + List<TargetTable> targetTables, + Map<String, List<TargetCatalogConfig>> targetCatalogs, + SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; + this.targetCatalogs = targetCatalogs; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java similarity index 55% copy from xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java copy to xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java index b37e1c1e..16785ec6 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java @@ -18,29 +18,25 @@ package org.apache.xtable.conversion; -import java.util.Properties; +import java.util.Collections; +import java.util.Map; import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; import lombok.NonNull; +import lombok.Value; -@EqualsAndHashCode(callSuper = true) -@Getter -public class SourceTable extends ExternalTable { - /** The path to the data files, defaults to the metadataPath */ - @NonNull private final String dataPath; +/** Defines the configuration for an external catalog. */ +@Value +@Builder +public class ExternalCatalogConfig implements CatalogConfig { + /** The name of the catalog, it also acts as a unique identifier for each catalog */ + @NonNull String catalogName; - @Builder(toBuilder = true) - public SourceTable( - String name, - String formatName, - String basePath, - String dataPath, - String[] namespace, - CatalogConfig catalogConfig, - Properties additionalProperties) { - super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); - this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath); - } + /** The implementation class path for the catalog */ + @NonNull String catalogImpl; + + /** + * The properties for each catalog, used for providing any custom behaviour during catalog sync + */ + @NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap(); } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java index b37e1c1e..f3e1c359 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java @@ -28,7 +28,7 @@ import lombok.NonNull; @EqualsAndHashCode(callSuper = true) @Getter public class SourceTable extends ExternalTable { - /** The path to the data files, defaults to the metadataPath */ + /** The path to the data files, defaults to the basePath */ @NonNull private final String dataPath; @Builder(toBuilder = true) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java similarity index 56% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java index 920a95f4..ca6cec2d 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java @@ -16,25 +16,27 @@ * limitations under the License. */ -package org.apache.xtable.model.exception; +package org.apache.xtable.conversion; -import lombok.Getter; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); +import org.apache.xtable.model.catalog.CatalogTableIdentifier; - private final int errorCode; +/** + * TargetCatalogConfig contains the parameters that are required when syncing {@link TargetTable} to + * a catalog. + */ +@Value +@Builder +public class TargetCatalogConfig { + /** + * The tableIdentifiers(databaseName, tableName) that will be used when syncing {@link + * TargetTable} to the catalog. + */ + @NonNull CatalogTableIdentifier catalogTableIdentifier; - ErrorCode(int errorCode) { - this.errorCode = errorCode; - } + /** Configuration for the catalog. */ + @NonNull ExternalCatalogConfig catalogConfig; } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 6256da2c..7f503b75 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -44,4 +44,8 @@ public class TargetTable extends ExternalTable { this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; } + + public String getId() { + return String.format("%s#%s", sanitizeBasePath(this.basePath), formatName); + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java similarity index 53% copy from xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java copy to xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java index b37e1c1e..483ace49 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java @@ -16,31 +16,29 @@ * limitations under the License. */ -package org.apache.xtable.conversion; - -import java.util.Properties; +package org.apache.xtable.model.catalog; import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; import lombok.NonNull; +import lombok.Value; + +/** This class represents the unique identifier for a table in a catalog. */ +@Value +@Builder +public class CatalogTableIdentifier { + /** + * Catalogs have the ability to group tables logically, databaseName is the identifier for such + * logical classification. The alternate names for this field include namespace, schemaName etc. + */ + @NonNull String databaseName; -@EqualsAndHashCode(callSuper = true) -@Getter -public class SourceTable extends ExternalTable { - /** The path to the data files, defaults to the metadataPath */ - @NonNull private final String dataPath; + /** + * The table name used when syncing the table to the catalog. NOTE: This name can be different + * from the table name in storage. + */ + @NonNull String tableName; - @Builder(toBuilder = true) - public SourceTable( - String name, - String formatName, - String basePath, - String dataPath, - String[] namespace, - CatalogConfig catalogConfig, - Properties additionalProperties) { - super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); - this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath); + public String getId() { + return String.format("%s-%s", databaseName, tableName); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java similarity index 67% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java index 920a95f4..dd322292 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java @@ -18,23 +18,14 @@ package org.apache.xtable.model.exception; -import lombok.Getter; +/** Exception thrown when refresh operation (updating table format metadata) in catalog fails. */ +public class CatalogRefreshException extends InternalException { -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); - - private final int errorCode; + public CatalogRefreshException(String message, Throwable e) { + super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e); + } - ErrorCode(int errorCode) { - this.errorCode = errorCode; + public CatalogRefreshException(String message) { + super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java index 920a95f4..af85c900 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java @@ -30,7 +30,8 @@ public enum ErrorCode { INVALID_SCHEMA(10006), UNSUPPORTED_SCHEMA_TYPE(10007), UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); + PARSE_EXCEPTION(10009), + CATALOG_REFRESH_EXCEPTION(10010); private final int errorCode; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java index d158b38c..390ab2e1 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java @@ -20,6 +20,7 @@ package org.apache.xtable.model.sync; import java.time.Duration; import java.time.Instant; +import java.util.List; import lombok.Builder; import lombok.Value; @@ -30,7 +31,7 @@ import lombok.Value; * @since 0.1 */ @Value -@Builder +@Builder(toBuilder = true) public class SyncResult { // Mode used for the sync SyncMode mode; @@ -38,10 +39,12 @@ public class SyncResult { Instant syncStartTime; // Duration Duration syncDuration; - // Status of the sync - SyncStatus status; + // Status of the tableFormat sync + SyncStatus tableFormatSyncStatus; // The Sync Mode recommended for the next sync (Usually filled on an error) SyncMode recommendedSyncMode; + // The sync status for each catalog. + List<CatalogSyncStatus> catalogSyncStatusList; public enum SyncStatusCode { SUCCESS, @@ -57,6 +60,25 @@ public class SyncResult { SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build(); // Status code SyncStatusCode statusCode; + // errorDetails if any + ErrorDetails errorDetails; + } + + /** Represents status for catalog sync status operation */ + @Value + @Builder + public static class CatalogSyncStatus { + // Catalog Identifier. + String catalogName; + // Status code + SyncStatusCode statusCode; + // errorDetails if any + ErrorDetails errorDetails; + } + + @Value + @Builder + public static class ErrorDetails { // error Message if any String errorMessage; // Readable description of the error diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java similarity index 55% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java index 920a95f4..b334baae 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java @@ -16,25 +16,18 @@ * limitations under the License. */ -package org.apache.xtable.model.exception; +package org.apache.xtable.spi.extractor; -import lombok.Getter; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); - - private final int errorCode; - - ErrorCode(int errorCode) { - this.errorCode = errorCode; - } +/** + * A client for converting the table with tableIdentifier {@link CatalogTableIdentifier} in {@link + * org.apache.xtable.conversion.SourceCatalog} to SourceTable object. {@link SourceTable} can be + * used by downstream consumers for syncing it to multiple {@link + * org.apache.xtable.conversion.TargetTable} + */ +public interface CatalogConversionSource { + /** Returns the source table object present in the catalog. */ + SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier); } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java index 2500454c..21f7f63f 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java @@ -41,6 +41,15 @@ public interface ConversionSource<COMMIT> extends Closeable { */ InternalTable getTable(COMMIT commit); + /** + * Extracts the {@link InternalTable} as of latest state. This method is less expensive as + * compared to {@link ConversionSource#getCurrentSnapshot()} as it doesn't load the files present + * in the table. + * + * @return {@link InternalTable} representing the current table. + */ + InternalTable getCurrentTable(); + /** * Extracts the {@link InternalSnapshot} as of latest state. * diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java new file mode 100644 index 00000000..0abdfd37 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +/** Provides the functionality to sync metadata from InternalTable to multiple target catalogs */ +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public SyncResult syncTable( + Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients, InternalTable table) { + List<CatalogSyncStatus> results = new ArrayList<>(); + Instant startTime = Instant.now(); + catalogSyncClients.forEach( + ((tableIdentifier, catalogSyncClient) -> { + try { + results.add(syncCatalog(catalogSyncClient, tableIdentifier, table)); + log.info( + "Catalog sync is successful for table {} with format {} using catalogSync {}", + table.getBasePath(), + table.getTableFormat(), + catalogSyncClient.getClass().getName()); + } catch (Exception e) { + log.error( + "Catalog sync failed for table {} with format {} using catalogSync {}", + table.getBasePath(), + table.getTableFormat(), + catalogSyncClient.getClass().getName()); + results.add( + getCatalogSyncFailureStatus( + catalogSyncClient.getCatalogName(), catalogSyncClient.getClass().getName(), e)); + } + })); + return SyncResult.builder() + .lastInstantSynced(table.getLatestCommitTime()) + .syncStartTime(startTime) + .syncDuration(Duration.between(startTime, Instant.now())) + .catalogSyncStatusList(results) + .build(); + } + + private <TABLE> CatalogSyncStatus syncCatalog( + CatalogSyncClient<TABLE> catalogSyncClient, + CatalogTableIdentifier tableIdentifier, + InternalTable table) { + if (!catalogSyncClient.hasDatabase(tableIdentifier.getDatabaseName())) { + catalogSyncClient.createDatabase(tableIdentifier.getDatabaseName()); + } + TABLE catalogTable = catalogSyncClient.getTable(tableIdentifier); + String storageDescriptorLocation = catalogSyncClient.getStorageDescriptorLocation(catalogTable); + if (catalogTable == null) { + catalogSyncClient.createTable(table, tableIdentifier); + } else if (hasStorageDescriptorLocationChanged( + storageDescriptorLocation, table.getBasePath())) { + // Replace table if there is a mismatch between hmsTable location and Xtable basePath. + // Possible reasons could be: + // 1) hms table (manually) created with a different location before and need to be + // re-created with a new basePath + // 2) XTable basePath changes due to migration or other reasons + String oldLocation = + StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation; + log.warn( + "StorageDescriptor location changed from {} to {}, re-creating table", + oldLocation, + table.getBasePath()); + catalogSyncClient.createOrReplaceTable(table, tableIdentifier); + } else { + log.debug("Table metadata changed, refreshing table"); + catalogSyncClient.refreshTable(table, catalogTable, tableIdentifier); + } + return CatalogSyncStatus.builder() + .catalogName(catalogSyncClient.getCatalogName()) + .statusCode(SyncResult.SyncStatusCode.SUCCESS) + .build(); + } + + private CatalogSyncStatus getCatalogSyncFailureStatus( + String catalogName, String catalogImpl, Exception e) { + return CatalogSyncStatus.builder() + .catalogName(catalogName) + .statusCode(SyncResult.SyncStatusCode.ERROR) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage(e.getMessage()) + .errorDescription("catalogSync failed for " + catalogImpl) + .build()) + .build(); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java new file mode 100644 index 00000000..6eb21ab6 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +/** + * An interface for syncing {@link InternalTable} object to {@link TABLE} object defined by the + * catalog. + * + * @param <TABLE> + */ +public interface CatalogSyncClient<TABLE> extends AutoCloseable { + /** + * Returns a unique identifier for the catalog, allows user to sync table to multiple catalogs of + * the same type eg: HMS catalogs with url1, HMS catalog with url2. + */ + String getCatalogName(); + + /** Returns the storage location of the table synced to the catalog. */ + String getStorageDescriptorLocation(TABLE table); + + /** Checks whether a database exists in the catalog. */ + boolean hasDatabase(String databaseName); + + /** Creates a database in the catalog. */ + void createDatabase(String databaseName); + + /** + * Return the TABLE object used by the catalog implementation. Eg: HMS uses + * org.apache.hadoop.hive.metastore.api.Table, Glue uses + * software.amazon.awssdk.services.glue.model.Table etc. + */ + TABLE getTable(CatalogTableIdentifier tableIdentifier); + + /** + * Create a table in the catalog using the canonical InternalTable representation and + * tableIdentifier. + */ + void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier); + + /** Refreshes the table metadata in the catalog with tableIdentifier. */ + void refreshTable( + InternalTable table, TABLE catalogTable, CatalogTableIdentifier tableIdentifier); + + /** + * Tries to re-create the table in the catalog replacing state with the new canonical + * InternalTable representation and tableIdentifier. + */ + void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier); + + /** Drops a table from the catalog. */ + void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier); +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java new file mode 100644 index 00000000..cd3d6d5d --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import java.net.URI; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; + +import org.apache.xtable.model.exception.CatalogRefreshException; + +/** Utility methods used by CatalogSync. */ +public class CatalogUtils { + + /** + * Returns whether the location of the table in catalog is same as the one currently in storage. + * + * @param storageDescriptorLocation location of the table in catalog. + * @param tableBasePath location of the table in source table. + * @return equality of both the locations. + */ + static boolean hasStorageDescriptorLocationChanged( + String storageDescriptorLocation, String tableBasePath) { + + if (StringUtils.isEmpty(storageDescriptorLocation)) { + return true; + } + URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri(); + URI basePathUri = new Path(tableBasePath).toUri(); + + if (storageDescriptorUri.equals(basePathUri) + || storageDescriptorUri.getScheme() == null + || basePathUri.getScheme() == null + || storageDescriptorUri.getScheme().startsWith(basePathUri.getScheme()) + || basePathUri.getScheme().startsWith(storageDescriptorUri.getScheme())) { + String storageDescriptorLocationIdentifier = + storageDescriptorUri.getAuthority() + storageDescriptorUri.getPath(); + String tableBasePathIdentifier = basePathUri.getAuthority() + basePathUri.getPath(); + return !Objects.equals(storageDescriptorLocationIdentifier, tableBasePathIdentifier); + } + throw new CatalogRefreshException( + String.format( + "Storage scheme has changed for table catalogStorageDescriptorUri %s basePathUri %s", + storageDescriptorUri, basePathUri)); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java index 7cd0b384..bb340669 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java @@ -166,7 +166,7 @@ public class TableFormatSync { return SyncResult.builder() .mode(mode) - .status(SyncResult.SyncStatus.SUCCESS) + .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS) .syncStartTime(startTime) .syncDuration(Duration.between(startTime, Instant.now())) .lastInstantSynced(tableState.getLatestCommitTime()) @@ -181,12 +181,15 @@ public class TableFormatSync { private SyncResult buildResultForError(SyncMode mode, Instant startTime, Exception e) { return SyncResult.builder() .mode(mode) - .status( + .tableFormatSyncStatus( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage(e.getMessage()) - .errorDescription("Failed to sync " + mode.name()) - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage(e.getMessage()) + .errorDescription("Failed to sync " + mode.name()) + .canRetryOnFailure(true) + .build()) .build()) .syncStartTime(startTime) .syncDuration(Duration.between(startTime, Instant.now())) diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java new file mode 100644 index 00000000..213ccbed --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.google.common.collect.ImmutableMap; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.sync.SyncResult; + +@ExtendWith(MockitoExtension.class) +public class TestCatalogSync<TABLE> { + + @Mock CatalogSyncClient<TABLE> mockClient1; + @Mock CatalogSyncClient<TABLE> mockClient2; + @Mock CatalogSyncClient<TABLE> mockClient3; + @Mock CatalogSyncClient<TABLE> mockClient4; + + private final CatalogTableIdentifier tableIdentifier1 = + CatalogTableIdentifier.builder().databaseName("database1").tableName("table1").build(); + private final CatalogTableIdentifier tableIdentifier2 = + CatalogTableIdentifier.builder().databaseName("database2").tableName("table2").build(); + private final CatalogTableIdentifier tableIdentifier3 = + CatalogTableIdentifier.builder().databaseName("database3").tableName("table3").build(); + private final CatalogTableIdentifier tableIdentifier4 = + CatalogTableIdentifier.builder().databaseName("database4").tableName("table4").build(); + + @Mock TABLE mockTable; + private final InternalTable internalTable = + InternalTable.builder() + .readSchema(InternalSchema.builder().name("test_schema").build()) + .partitioningFields( + Collections.singletonList( + InternalPartitionField.builder() + .sourceField(InternalField.builder().name("partition_field").build()) + .transformType(PartitionTransformType.VALUE) + .build())) + .latestCommitTime(Instant.now().minus(10, ChronoUnit.MINUTES)) + .basePath("/tmp/test") + .build(); + + @Test + void testSyncTable() { + when(mockClient1.hasDatabase("database1")).thenReturn(false); + when(mockClient2.hasDatabase("database2")).thenReturn(true); + when(mockClient3.hasDatabase("database3")).thenReturn(true); + when(mockClient4.hasDatabase("database4")) + .thenThrow(new UnsupportedOperationException("No catalog impl")); + + when(mockClient1.getTable(tableIdentifier1)).thenReturn(mockTable); + when(mockClient2.getTable(tableIdentifier2)).thenReturn(null); + when(mockClient3.getTable(tableIdentifier3)).thenReturn(mockTable); + + when(mockClient1.getStorageDescriptorLocation(any())).thenReturn("/tmp/test_changed"); + when(mockClient2.getStorageDescriptorLocation(any())).thenReturn("/tmp/test"); + when(mockClient3.getStorageDescriptorLocation(any())).thenReturn("/tmp/test"); + + when(mockClient4.getCatalogName()).thenReturn("catalogName4"); + + Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients = + ImmutableMap.of( + tableIdentifier1, mockClient1, + tableIdentifier2, mockClient2, + tableIdentifier3, mockClient3, + tableIdentifier4, mockClient4); + + List<SyncResult.CatalogSyncStatus> results = + CatalogSync.getInstance() + .syncTable(catalogSyncClients, internalTable) + .getCatalogSyncStatusList(); + List<SyncResult.CatalogSyncStatus> errorStatus = + results.stream() + .filter(status -> status.getStatusCode().equals(SyncResult.SyncStatusCode.ERROR)) + .collect(Collectors.toList()); + assertEquals(SyncResult.SyncStatusCode.ERROR, errorStatus.get(0).getStatusCode()); + assertEquals( + 3, + results.stream() + .map(SyncResult.CatalogSyncStatus::getStatusCode) + .filter(statusCode -> statusCode.equals(SyncResult.SyncStatusCode.SUCCESS)) + .count()); + + verify(mockClient1, times(1)).createDatabase("database1"); + verify(mockClient1, times(1)).createOrReplaceTable(internalTable, tableIdentifier1); + verify(mockClient2, times(1)).createTable(eq(internalTable), eq(tableIdentifier2)); + verify(mockClient3, times(1)).refreshTable(eq(internalTable), any(), eq(tableIdentifier3)); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java new file mode 100644 index 00000000..69575e8e --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.xtable.model.exception.CatalogRefreshException; + +public class TestCatalogUtils { + + static Stream<Arguments> storageLocationTestArgs() { + return Stream.of( + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true), + Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v2/", true), + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false), + Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false), + Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false), + Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false), + Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/", false), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v1/", false)); + } + + static Stream<Arguments> storageLocationTestArgsException() { + return Stream.of( + Arguments.of( + "s3://bucket/table/v1", + "gs://bucket/table/v1", + new CatalogRefreshException( + "Storage scheme has changed for table catalogStorageDescriptorUri s3://bucket/table/v1 basePathUri gs://bucket/table/v1"))); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgs") + void testHasStorageLocationChanged(String storageLocation, String basePath, boolean expected) { + assertEquals(expected, hasStorageDescriptorLocationChanged(storageLocation, basePath)); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgsException") + void testHasStorageLocationChangedException( + String storageLocation, String basePath, Exception exception) { + assertThrows( + exception.getClass(), + () -> hasStorageDescriptorLocationChanged(storageLocation, basePath), + exception.getMessage()); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java index 2a9e0588..39480f8b 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java @@ -88,7 +88,7 @@ public class TestTableFormatSync { assertEquals(2, result.size()); SyncResult successResult = result.get(TableFormat.DELTA); - assertEquals(SyncResult.SyncStatus.SUCCESS, successResult.getStatus()); + assertEquals(SyncResult.SyncStatus.SUCCESS, successResult.getTableFormatSyncStatus()); assertEquals(SyncMode.FULL, successResult.getMode()); assertEquals(startingTableState.getLatestCommitTime(), successResult.getLastInstantSynced()); assertSyncResultTimes(successResult, start); @@ -99,11 +99,14 @@ public class TestTableFormatSync { assertEquals( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage("Failure") - .errorDescription("Failed to sync FULL") - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage("Failure") + .errorDescription("Failed to sync FULL") + .canRetryOnFailure(true) + .build()) .build(), - failureResult.getStatus()); + failureResult.getTableFormatSyncStatus()); verifyBaseConversionTargetCalls( mockConversionTarget2, startingTableState, pendingCommitInstants); @@ -168,7 +171,8 @@ public class TestTableFormatSync { assertEquals( tableChanges.get(0).getTableAsOfChange().getLatestCommitTime(), partialSuccessResults.get(0).getLastInstantSynced()); - assertEquals(SyncResult.SyncStatus.SUCCESS, partialSuccessResults.get(0).getStatus()); + assertEquals( + SyncResult.SyncStatus.SUCCESS, partialSuccessResults.get(0).getTableFormatSyncStatus()); assertSyncResultTimes(partialSuccessResults.get(0), start); assertEquals(SyncMode.INCREMENTAL, partialSuccessResults.get(1).getMode()); @@ -176,11 +180,14 @@ public class TestTableFormatSync { assertEquals( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage("Failure") - .errorDescription("Failed to sync INCREMENTAL") - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage("Failure") + .errorDescription("Failed to sync INCREMENTAL") + .canRetryOnFailure(true) + .build()) .build(), - partialSuccessResults.get(1).getStatus()); + partialSuccessResults.get(1).getTableFormatSyncStatus()); // Assert that all 3 changes are properly synced to the other format List<SyncResult> successResults = result.get(TableFormat.DELTA); @@ -190,7 +197,7 @@ public class TestTableFormatSync { assertEquals( tableChanges.get(i).getTableAsOfChange().getLatestCommitTime(), successResults.get(i).getLastInstantSynced()); - assertEquals(SyncResult.SyncStatus.SUCCESS, successResults.get(i).getStatus()); + assertEquals(SyncResult.SyncStatus.SUCCESS, successResults.get(i).getTableFormatSyncStatus()); assertSyncResultTimes(successResults.get(i), start); } @@ -257,7 +264,8 @@ public class TestTableFormatSync { assertEquals(2, conversionTarget1Results.size()); for (SyncResult conversionTarget1Result : conversionTarget1Results) { assertEquals(SyncMode.INCREMENTAL, conversionTarget1Result.getMode()); - assertEquals(SyncResult.SyncStatus.SUCCESS, conversionTarget1Result.getStatus()); + assertEquals( + SyncResult.SyncStatus.SUCCESS, conversionTarget1Result.getTableFormatSyncStatus()); assertSyncResultTimes(conversionTarget1Result, start); } assertEquals( @@ -275,7 +283,9 @@ public class TestTableFormatSync { assertEquals( tableChanges.get(i + 1).getTableAsOfChange().getLatestCommitTime(), conversionTarget2Results.get(i).getLastInstantSynced()); - assertEquals(SyncResult.SyncStatus.SUCCESS, conversionTarget2Results.get(i).getStatus()); + assertEquals( + SyncResult.SyncStatus.SUCCESS, + conversionTarget2Results.get(i).getTableFormatSyncStatus()); assertSyncResultTimes(conversionTarget2Results.get(i), start); } @@ -330,7 +340,7 @@ public class TestTableFormatSync { conversionTarget2Results.forEach( syncResult -> { assertEquals(SyncMode.INCREMENTAL, syncResult.getMode()); - assertEquals(SyncResult.SyncStatus.SUCCESS, syncResult.getStatus()); + assertEquals(SyncResult.SyncStatus.SUCCESS, syncResult.getTableFormatSyncStatus()); assertSyncResultTimes(syncResult, start); });
