the-other-tim-brown commented on code in PR #591: URL: https://github.com/apache/incubator-xtable/pull/591#discussion_r1874530616
########## xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.exception; + +public class CatalogRefreshException extends InternalException { Review Comment: Let's add a quick javadoc explaining when this exception should be used ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; + +public class CatalogUtils { + private static final List<String> S3_FS_SCHEMES = Arrays.asList("s3", "s3a", "s3n"); + + static boolean hasStorageDescriptorLocationChanged( + String storageDescriptorLocation, String tableBasePath) { + + if (StringUtils.isEmpty(storageDescriptorLocation)) { + return true; + } + URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri(); + URI basePathUri = new Path(tableBasePath).toUri(); + + // In case of s3 path, compare without schemes + boolean includeScheme = Review Comment: Is there a way to make this more generic so it is not specific to S3? ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncOperations.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogType; + +public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable { Review Comment: Will implementations of this be "clients?" Wondering if there is a better name for this interface ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.exception.CatalogRefreshException; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) { + Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); + for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) { + results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new ArrayList<>()); + results + .get(catalogSyncOperation.getTableFormat()) + .add(getCatalogSyncStatus(catalogSyncOperation, table)); + } + return results; + } + + private CatalogSyncStatus getCatalogSyncStatus( + CatalogSyncOperations catalogSyncOperation, InternalTable table) { Review Comment: ```suggestion private <DATABASE, TABLE> CatalogSyncStatus getCatalogSyncStatus( CatalogSyncOperations<DATABASE, TABLE> catalogSyncOperation, InternalTable table) { ``` ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.exception.CatalogRefreshException; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) { + Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); + for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) { + results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new ArrayList<>()); + results + .get(catalogSyncOperation.getTableFormat()) + .add(getCatalogSyncStatus(catalogSyncOperation, table)); + } + return results; + } + + private CatalogSyncStatus getCatalogSyncStatus( + CatalogSyncOperations catalogSyncOperation, InternalTable table) { + ExternalCatalog.TableIdentifier tableIdentifier = catalogSyncOperation.getTableIdentifier(); + boolean doesDatabaseExists = + catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != null; + if (!doesDatabaseExists) { + catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName()); + } + Object catalogTable = catalogSyncOperation.getTable(tableIdentifier); + String storageDescriptorLocation = + catalogSyncOperation.getStorageDescriptorLocation(catalogTable); + if (catalogTable == null) { + catalogSyncOperation.createTable(table, tableIdentifier); + } else if (hasStorageDescriptorLocationChanged( + storageDescriptorLocation, table.getBasePath())) { + // Replace table if there is a mismatch between hmsTable location and Xtable basePath. + // Possible reasons could be: + // 1) hms table (manually) created with a different location before and need to be + // re-created with a new basePath + // 2) XTable basePath changes due to migration or other reasons + String oldLocation = + StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation; + log.warn( + "StorageDescriptor location changed from {} to {}, re-creating table", + oldLocation, + table.getBasePath()); + catalogSyncOperation.createOrReplaceTable(table, tableIdentifier); + } else { + try { + log.debug("Table metadata changed, refreshing table"); + catalogSyncOperation.refreshTable(table, catalogTable, tableIdentifier); + } catch (CatalogRefreshException e) { + log.warn("Table refresh failed, re-creating table", e); + catalogSyncOperation.createOrReplaceTable(table, tableIdentifier); Review Comment: Can this cause disruption to consumers? If so, we may want to consider whether we want to simply retry the refreshTable operation on certain failure types. ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncOperations.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogType; + +public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable { + String getCatalogIdentifier(); Review Comment: Can you add docs for each method so it is clear what each should do? ########## xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java: ########## @@ -64,4 +67,27 @@ public static class SyncStatus { // Can the client retry for this type of error (Transient error=true, persistent error=false) boolean canRetryOnFailure; } + + /** Represents status for catalog sync status operation */ + @Value + @Builder + public static class CatalogSyncStatus { + // Catalog Identifier. + String catalogIdentifier; + // Status code + SyncStatusCode statusCode; + // errorDetails if any + ErrorDetails errorDetails; + } + + @Value + @Builder + public static class ErrorDetails { Review Comment: Should we update the `SyncStatus` to also use `ErrorDetails`? ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; + +public class CatalogUtils { Review Comment: Can you add unit tests for the methods here? ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.exception.CatalogRefreshException; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) { + Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); + for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) { Review Comment: ```suggestion for (CatalogSyncOperations<?, ?> catalogSyncOperation : catalogSyncOperations) { ``` ########## xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Map; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +import org.apache.xtable.model.catalog.CatalogType; + +/** + * This class represents the configuration for an external catalog. It holds information required to + * interact with the catalog, such as its identifier, type, table formats to sync and other catalog + * specific properties related to permissions, catalog uris, access-tokens etc. + */ +@Value +@Builder +public class ExternalCatalog { + /** + * An identifier to be used for the catalog if there are multiple catalogs of the same type but in + * different accounts or regions. + */ + @NonNull String catalogIdentifier; + + /** The type of catalog. */ + @NonNull CatalogType catalogType; + + /** + * The table formats that will be synced to this catalog along with their {@link TableIdentifier}. + * Eg: ICEBERG -> {marketing, price}, HUDI -> {marketing, price_hudi}, DELTA -> {delta_tables, + * price} + */ + @NonNull Map<String, TableIdentifier> tableFormatsToSync; Review Comment: We have a concept of a `SourceTable` and `TargetTable` which both extend `ExternalTable`. Should we also have a base catalog class that we can use a source of information as well in the future? ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.exception.CatalogRefreshException; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) { + Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); + for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) { + results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new ArrayList<>()); + results + .get(catalogSyncOperation.getTableFormat()) + .add(getCatalogSyncStatus(catalogSyncOperation, table)); + } + return results; + } + + private CatalogSyncStatus getCatalogSyncStatus( + CatalogSyncOperations catalogSyncOperation, InternalTable table) { + ExternalCatalog.TableIdentifier tableIdentifier = catalogSyncOperation.getTableIdentifier(); + boolean doesDatabaseExists = + catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != null; + if (!doesDatabaseExists) { + catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName()); + } + Object catalogTable = catalogSyncOperation.getTable(tableIdentifier); Review Comment: ```suggestion TABLE catalogTable = catalogSyncOperation.getTable(tableIdentifier); ``` ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.exception.CatalogRefreshException; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) { + Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); + for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) { + results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new ArrayList<>()); + results + .get(catalogSyncOperation.getTableFormat()) + .add(getCatalogSyncStatus(catalogSyncOperation, table)); + } + return results; + } + + private CatalogSyncStatus getCatalogSyncStatus( + CatalogSyncOperations catalogSyncOperation, InternalTable table) { + ExternalCatalog.TableIdentifier tableIdentifier = catalogSyncOperation.getTableIdentifier(); + boolean doesDatabaseExists = + catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != null; + if (!doesDatabaseExists) { + catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName()); + } + Object catalogTable = catalogSyncOperation.getTable(tableIdentifier); + String storageDescriptorLocation = + catalogSyncOperation.getStorageDescriptorLocation(catalogTable); + if (catalogTable == null) { + catalogSyncOperation.createTable(table, tableIdentifier); + } else if (hasStorageDescriptorLocationChanged( + storageDescriptorLocation, table.getBasePath())) { + // Replace table if there is a mismatch between hmsTable location and Xtable basePath. + // Possible reasons could be: + // 1) hms table (manually) created with a different location before and need to be + // re-created with a new basePath + // 2) XTable basePath changes due to migration or other reasons + String oldLocation = + StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation; + log.warn( + "StorageDescriptor location changed from {} to {}, re-creating table", + oldLocation, + table.getBasePath()); + catalogSyncOperation.createOrReplaceTable(table, tableIdentifier); + } else { + try { + log.debug("Table metadata changed, refreshing table"); + catalogSyncOperation.refreshTable(table, catalogTable, tableIdentifier); + } catch (CatalogRefreshException e) { + log.warn("Table refresh failed, re-creating table", e); + catalogSyncOperation.createOrReplaceTable(table, tableIdentifier); + } + } + return CatalogSyncStatus.builder() Review Comment: Does this need to be wrapped in a try-catch so we can return a proper error status? ########## xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogType.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.catalog; + +// TODO: We can add more options here - GLUE, BIG_LAKE, UNITY etc. depending on the community +// feedback. +public enum CatalogType { Review Comment: For the table formats, we decided to move away from an enum so that users could add their own implementations to the classpath without needing to modify the source code. For example, if someone had a private Paimon source, it would not simply error out since the enum value was missing from our main branch. Should we do that here as well? ########## xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Map; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +import org.apache.xtable.model.catalog.CatalogType; + +/** + * This class represents the configuration for an external catalog. It holds information required to Review Comment: Should we add some clarification that this is different than the Iceberg catalog you can configure in XTable currently? ########## xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.conversion.ExternalCatalog; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.exception.CatalogRefreshException; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public Map<String, List<CatalogSyncStatus>> syncTable( + Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable table) { + Map<String, List<CatalogSyncStatus>> results = new HashMap<>(); + for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) { + results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new ArrayList<>()); + results + .get(catalogSyncOperation.getTableFormat()) + .add(getCatalogSyncStatus(catalogSyncOperation, table)); + } + return results; + } + + private CatalogSyncStatus getCatalogSyncStatus( + CatalogSyncOperations catalogSyncOperation, InternalTable table) { + ExternalCatalog.TableIdentifier tableIdentifier = catalogSyncOperation.getTableIdentifier(); + boolean doesDatabaseExists = + catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != null; Review Comment: should the interface simply provide a `hasDatabase` instead of the `getDatabase`? Then we can remove the `DATABASE` generic type on the interface to simplify a bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org