This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch 590-CatalogSync in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit af817aab2876a5300480f55ca7d2e4f4a261d595 Author: Vinish Reddy <[email protected]> AuthorDate: Thu Dec 5 23:48:39 2024 -0800 [590] Add interface for CatalogSyncClient and CatalogSyncOperations --- pom.xml | 15 +- .../apache/xtable/conversion/ConversionConfig.java | 9 +- .../apache/xtable/conversion/ExternalCatalog.java | 79 +++++++++++ .../ErrorCode.java => catalog/CatalogType.java} | 26 +--- .../apache/xtable/model/exception/ErrorCode.java | 3 +- .../sync/CatalogSyncClient.java} | 24 +--- xtable-core/pom.xml | 11 ++ .../xtable/catalog/CatalogClientFactory.java | 35 ++--- .../xtable/catalog/CatalogSyncClientImpl.java | 95 +++++++++++++ .../xtable/catalog/CatalogSyncOperations.java | 47 +++++++ .../org/apache/xtable/catalog/CatalogUtils.java | 57 ++++++++ .../xtable/exception/CatalogRefreshException.java | 23 +--- .../xtable/catalog/TestCatalogSyncClientImpl.java | 152 +++++++++++++++++++++ .../apache/xtable/catalog/TestCatalogUtils.java | 51 +++++++ 14 files changed, 553 insertions(+), 74 deletions(-) diff --git a/pom.xml b/pom.xml index 99b4fe1a..fc903fc6 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ <maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version> <maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version> <maven-release-plugin.version>2.5.3</maven-release-plugin.version> + <mockito.version>4.8.0</mockito.version> <parquet.version>1.12.2</parquet.version> <protobuf.version>3.25.5</protobuf.version> <scala12.version>2.12.20</scala12.version> @@ -438,7 +439,19 @@ <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> - <version>4.8.0</version> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + <version>${mockito.version}</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>${mockito.version}</version> <scope>test</scope> </dependency> diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 73e2628d..cab6bf1e 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -18,6 +18,7 @@ package org.apache.xtable.conversion; +import java.util.Collections; import java.util.List; import lombok.Builder; @@ -36,15 +37,21 @@ public class ConversionConfig { List<TargetTable> targetTables; // The mode, incremental or snapshot SyncMode syncMode; + // One or more external catalogs to sync the table metadata to + List<ExternalCatalog> externalCatalogs; @Builder ConversionConfig( - @NonNull SourceTable sourceTable, List<TargetTable> targetTables, SyncMode syncMode) { + @NonNull SourceTable sourceTable, + List<TargetTable> targetTables, + SyncMode syncMode, + List<ExternalCatalog> externalCatalogs) { this.sourceTable = sourceTable; this.targetTables = targetTables; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; + this.externalCatalogs = externalCatalogs == null ? Collections.emptyList() : externalCatalogs; } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java new file mode 100644 index 00000000..bf866106 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Map; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +import org.apache.xtable.model.catalog.CatalogType; + +/** + * This class represents the configuration for an external catalog. It holds information required to + * interact with the catalog, such as its identifier, type, table formats to sync and other catalog + * specific properties related to permissions, catalog uris, access-tokens etc. + */ +@Value +@Builder +public class ExternalCatalog { + /** + * An identifier to be used for the catalog if there are multiple catalogs of the same type but in + * different accounts or regions. + */ + @NonNull String catalogIdentifier; + + /** The type of catalog. */ + @NonNull CatalogType catalogType; + + /** + * The table formats that will be synced to this catalog along with their {@link TableIdentifier}. + * Eg: ICEBERG -> {marketing, price}, HUDI -> {marketing, price_hudi}, DELTA -> {delta_tables, + * price} + */ + @NonNull Map<String, TableIdentifier> tableFormatsToSync; + + /** + * These are properties specific for each catalog, it can be parsed into POJO for the specific + * catalog eg: GlueCatalogConfig + */ + @NonNull Map<String, String> catalogProperties; + + /** This class represents the unique identifier for a table in a catalog. */ + @Value + @Builder + public static class TableIdentifier { + /** + * Catalogs have the ability to group tables logically, databaseName is the identifier for such + * logical classification. The alternate names for this field include namespace, schemaName etc. + */ + @NonNull String databaseName; + + /** + * The table name used when syncing the table to the catalog. NOTE: This name can be different + * from the table name in storage. + */ + @NonNull String tableName; + + public String getId() { + return String.format("%s-%s", databaseName, tableName); + } + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogType.java similarity index 63% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogType.java index 920a95f4..9fb6843a 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogType.java @@ -16,25 +16,11 @@ * limitations under the License. */ -package org.apache.xtable.model.exception; +package org.apache.xtable.model.catalog; -import lombok.Getter; - -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); - - private final int errorCode; - - ErrorCode(int errorCode) { - this.errorCode = errorCode; - } +// TODO: We can add more options here - GLUE, BIG_LAKE, UNITY etc. depending on the community +// feedback. +public enum CatalogType { + /** Hive Metastore. */ + HMS } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java index 920a95f4..af85c900 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java @@ -30,7 +30,8 @@ public enum ErrorCode { INVALID_SCHEMA(10006), UNSUPPORTED_SCHEMA_TYPE(10007), UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); + PARSE_EXCEPTION(10009), + CATALOG_REFRESH_EXCEPTION(10010); private final int errorCode; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java similarity index 63% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java index 920a95f4..8dd5af72 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java @@ -16,25 +16,13 @@ * limitations under the License. */ -package org.apache.xtable.model.exception; +package org.apache.xtable.spi.sync; -import lombok.Getter; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogType; -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); +public interface CatalogSyncClient extends AutoCloseable { + void syncTable(InternalTable table); - private final int errorCode; - - ErrorCode(int errorCode) { - this.errorCode = errorCode; - } + CatalogType getCatalogType(); } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e..8521dbe9 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -144,6 +144,17 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <scope>test</scope> + </dependency> + <!-- Junit --> <dependency> diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogClientFactory.java similarity index 51% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-core/src/main/java/org/apache/xtable/catalog/CatalogClientFactory.java index 920a95f4..a419d0d3 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogClientFactory.java @@ -16,25 +16,28 @@ * limitations under the License. */ -package org.apache.xtable.model.exception; +package org.apache.xtable.catalog; -import lombok.Getter; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.catalog.CatalogType; +import org.apache.xtable.spi.sync.CatalogSyncClient; - private final int errorCode; +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogClientFactory { + private static final CatalogClientFactory INSTANCE = new CatalogClientFactory(); - ErrorCode(int errorCode) { - this.errorCode = errorCode; + public static CatalogClientFactory getInstance() { + return INSTANCE; + } + + public CatalogSyncClient createForCatalogAndFormat( + String tableFormat, ExternalCatalog externalCatalog) { + if (externalCatalog.getCatalogType() == CatalogType.HMS) { + // TODO: Create CatalogSyncOperations based on tableFormat and catalogType. + } + return null; } } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogSyncClientImpl.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogSyncClientImpl.java new file mode 100644 index 00000000..ce2d6a27 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogSyncClientImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.apache.xtable.catalog.CatalogUtils.hasStorageDescriptorLocationChanged; + +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.exception.CatalogRefreshException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogType; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +@Log4j2 +public class CatalogSyncClientImpl<Database, Table> implements CatalogSyncClient { + + private final CatalogSyncOperations<Database, Table> operations; + private final CatalogType catalogType; + + public CatalogSyncClientImpl( + CatalogSyncOperations<Database, Table> operations, CatalogType catalogType) { + this.operations = operations; + this.catalogType = catalogType; + } + + @Override + public void syncTable(InternalTable table) { + ExternalCatalog.TableIdentifier tableIdentifier = operations.getTableIdentifier(); + boolean doesDatabaseExists = operations.getDatabase(tableIdentifier.getDatabaseName()) != null; + if (!doesDatabaseExists) { + operations.createDatabase(tableIdentifier.getDatabaseName()); + } + Table catalogTable = operations.getTable(tableIdentifier); + String storageDescriptorLocation = operations.getStorageDescriptorLocation(catalogTable); + if (catalogTable == null) { + operations.createTable(table, tableIdentifier); + } else if (hasStorageDescriptorLocationChanged( + storageDescriptorLocation, table.getBasePath())) { + // Replace table if there is a mismatch between hmsTable location and Xtable basePath. + // Possible reasons could be: + // 1) hms table (manually) created with a different location before and need to be + // re-created with a new basePath + // 2) XTable basePath changes due to migration or other reasons + String oldLocation = + StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation; + log.warn( + "StorageDescriptor location changed from {} to {}, re-creating table", + oldLocation, + table.getBasePath()); + operations.createOrReplaceTable(table, tableIdentifier); + } else { + try { + log.debug("Table metadata changed, refreshing table"); + operations.refreshTable(table, catalogTable, tableIdentifier); + } catch (CatalogRefreshException e) { + log.warn("Table refresh failed, re-creating table", e); + operations.createOrReplaceTable(table, tableIdentifier); + } + } + log.debug( + "{} {} catalog sync successful for: {}", + operations.getTableFormat(), + catalogType, + tableIdentifier.getId()); + } + + @Override + public CatalogType getCatalogType() { + return catalogType; + } + + @Override + public void close() throws Exception { + operations.close(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogSyncOperations.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogSyncOperations.java new file mode 100644 index 00000000..8c4ddb14 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogSyncOperations.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier; +import org.apache.xtable.exception.CatalogRefreshException; +import org.apache.xtable.model.InternalTable; + +public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable { + + String getTableFormat(); + + TableIdentifier getTableIdentifier(); + + String getStorageDescriptorLocation(TABLE table); + + DATABASE getDatabase(String databaseName); + + void createDatabase(String databaseName); + + TABLE getTable(TableIdentifier tableIdentifier); + + void createTable(InternalTable table, TableIdentifier tableIdentifier); + + void refreshTable(InternalTable table, TABLE catalogTable, TableIdentifier tableIdentifier) + throws CatalogRefreshException; + + void createOrReplaceTable(InternalTable table, TableIdentifier tableIdentifier); + + void dropTable(InternalTable table, TableIdentifier tableIdentifier); +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java new file mode 100644 index 00000000..ae911b64 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; + +public class CatalogUtils { + + private static final List<String> S3_FS_SCHEMES = Arrays.asList("s3", "s3a", "s3n"); + + public static boolean hasStorageDescriptorLocationChanged( + String storageDescriptorLocation, String tableBasePath) { + + if (StringUtils.isEmpty(storageDescriptorLocation)) { + return true; + } + URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri(); + URI basePathUri = new Path(tableBasePath).toUri(); + + // In case of s3 path, compare without schemes + boolean includeScheme = + !S3_FS_SCHEMES.contains(basePathUri.getScheme()) + || !S3_FS_SCHEMES.contains(storageDescriptorUri.getScheme()); + storageDescriptorLocation = getPath(storageDescriptorUri, includeScheme); + tableBasePath = getPath(basePathUri, includeScheme); + return !Objects.equals(storageDescriptorLocation, tableBasePath); + } + + private static String getPath(URI uri, boolean includeScheme) { + if (includeScheme) { + return uri.toString(); + } + return uri.getAuthority() + uri.getPath(); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogRefreshException.java similarity index 64% copy from xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java copy to xtable-core/src/main/java/org/apache/xtable/exception/CatalogRefreshException.java index 920a95f4..d8fd5bd1 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogRefreshException.java @@ -16,25 +16,14 @@ * limitations under the License. */ -package org.apache.xtable.model.exception; +package org.apache.xtable.exception; -import lombok.Getter; +import org.apache.xtable.model.exception.ErrorCode; +import org.apache.xtable.model.exception.InternalException; -@Getter -public enum ErrorCode { - INVALID_CONFIGURATION(10001), - INVALID_PARTITION_SPEC(10002), - INVALID_PARTITION_VALUE(10003), - READ_EXCEPTION(10004), - UPDATE_EXCEPTION(10005), - INVALID_SCHEMA(10006), - UNSUPPORTED_SCHEMA_TYPE(10007), - UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); +public class CatalogRefreshException extends InternalException { - private final int errorCode; - - ErrorCode(int errorCode) { - this.errorCode = errorCode; + protected CatalogRefreshException(String message, Throwable e) { + super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogSyncClientImpl.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogSyncClientImpl.java new file mode 100644 index 00000000..4b0905cb --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogSyncClientImpl.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogType; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +@ExtendWith(MockitoExtension.class) +public class TestCatalogSyncClientImpl<Database, Table> { + + @Mock private CatalogSyncOperations<Database, Table> mockOperations; + @Mock private CatalogType mockCatalogType; + @Mock Table mockCatalogTable; + @Mock Database mockCatalogDb; + private CatalogSyncClient mockSyncClient; + + private static final String TEST_DATABASE = "hms_db"; + private static final String TEST_TABLE = "hms_table"; + private static final String BASE_PATH = "base-path"; + private static final InternalTable TEST_ONETABLE = + InternalTable.builder().basePath(BASE_PATH).build(); + private static final ExternalCatalog.TableIdentifier TEST_TABLE_IDENTIFIER = + ExternalCatalog.TableIdentifier.builder() + .databaseName(TEST_DATABASE) + .tableName(TEST_TABLE) + .build(); + + @BeforeEach + void setup() { + when(mockOperations.getTableIdentifier()).thenReturn(TEST_TABLE_IDENTIFIER); + mockSyncClient = new CatalogSyncClientImpl<>(mockOperations, mockCatalogType); + } + + @Test + void testSyncTable_DatabaseAndTableDoesNotExists() { + // mock database does not exists + when(mockOperations.getDatabase(TEST_DATABASE)).thenReturn(null); + // mock table does not exist + when(mockOperations.getTable(TEST_TABLE_IDENTIFIER)).thenReturn(null); + mockSyncClient.syncTable(TEST_ONETABLE); + + verifyCatalogOperation_DbApiCalls(1, 1); + verifyCatalogOperation_TableApiCalls(1, 1, 0, 0); + } + + @Test + void testSyncTable_DatabaseExistsButTableDoesNotExists() { + // mock databases exists + when(mockOperations.getDatabase(TEST_DATABASE)).thenReturn(mockCatalogDb); + // mock table does not exist + when(mockOperations.getTable(TEST_TABLE_IDENTIFIER)).thenReturn(null); + + mockSyncClient.syncTable(TEST_ONETABLE); + + verifyCatalogOperation_DbApiCalls(1, 0); + verifyCatalogOperation_TableApiCalls(1, 1, 0, 0); + } + + @Test + void testSyncTable_CreateOrReplaceTableDueToLocationMismatch() { + // mock databases exists + when(mockOperations.getDatabase(TEST_DATABASE)).thenReturn(mockCatalogDb); + // mock table exists + when(mockOperations.getTable(TEST_TABLE_IDENTIFIER)).thenReturn(mockCatalogTable); + when(mockOperations.getStorageDescriptorLocation(mockCatalogTable)) + .thenReturn("modified-location"); + + mockSyncClient.syncTable(TEST_ONETABLE); + + verifyCatalogOperation_DbApiCalls(1, 0); + verifyCatalogOperation_TableApiCalls(1, 0, 0, 1); + } + + @Test + void testSyncTable_RefreshTable() { + // mock databases exists + when(mockOperations.getDatabase(TEST_DATABASE)).thenReturn(mockCatalogDb); + // mock table exists + when(mockOperations.getTable(TEST_TABLE_IDENTIFIER)).thenReturn(mockCatalogTable); + when(mockOperations.getStorageDescriptorLocation(mockCatalogTable)) + .thenReturn(BASE_PATH); + + mockSyncClient.syncTable(TEST_ONETABLE); + + verifyCatalogOperation_DbApiCalls(1, 0); + verifyCatalogOperation_TableApiCalls(1, 0, 1, 0); + } + + @Test + void testSyncTable_FailureWhenUpdatingTable() { + // mock databases exists + when(mockOperations.getDatabase(TEST_DATABASE)).thenReturn(mockCatalogDb); + // mock table exists + when(mockOperations.getTable(TEST_TABLE_IDENTIFIER)).thenReturn(mockCatalogTable); + when(mockOperations.getStorageDescriptorLocation(mockCatalogTable)) + .thenReturn(BASE_PATH); + doThrow(new RuntimeException("something went wrong")) + .when(mockOperations) + .refreshTable(TEST_ONETABLE, mockCatalogTable, TEST_TABLE_IDENTIFIER); + + assertThrows(RuntimeException.class, () -> mockSyncClient.syncTable(TEST_ONETABLE)); + + verifyCatalogOperation_DbApiCalls(1, 0); + verifyCatalogOperation_TableApiCalls(1, 0, 1, 0); + } + + private void verifyCatalogOperation_DbApiCalls(int getDbCount, int createDbCount) { + verify(mockOperations, times(getDbCount)).getDatabase(TEST_DATABASE); + verify(mockOperations, times(createDbCount)).createDatabase(TEST_DATABASE); + } + + private void verifyCatalogOperation_TableApiCalls( + int getTableCount, int createTableCount, int refreshTable, int createOrReplaceTable) { + verify(mockOperations, times(getTableCount)).getTable(TEST_TABLE_IDENTIFIER); + verify(mockOperations, times(createTableCount)) + .createTable(TEST_ONETABLE, TEST_TABLE_IDENTIFIER); + verify(mockOperations, times(refreshTable)) + .refreshTable(TEST_ONETABLE, mockCatalogTable, TEST_TABLE_IDENTIFIER); + verify(mockOperations, times(createOrReplaceTable)) + .createOrReplaceTable(TEST_ONETABLE, TEST_TABLE_IDENTIFIER); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogUtils.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogUtils.java new file mode 100644 index 00000000..d2270fc1 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.apache.xtable.catalog.CatalogUtils.hasStorageDescriptorLocationChanged; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestCatalogUtils { + + static Stream<Arguments> storageLocationTestArgs() { + return Stream.of( + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true), + Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true), + Arguments.of("s3://bucket/table/v1", "gs://bucket/table/v1", true), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v2/", true), + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false), + Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false), + Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false), + Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false), + Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/", false), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v1/", false)); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgs") + void testHasStorageLocationChanged(String storageLocation, String basePath, boolean expected) { + assertEquals(expected, hasStorageDescriptorLocationChanged(storageLocation, basePath)); + } +}
