the-other-tim-brown commented on code in PR #591:
URL: https://github.com/apache/incubator-xtable/pull/591#discussion_r1874530616


##########
xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.exception;
+
+public class CatalogRefreshException extends InternalException {

Review Comment:
   Let's add a quick javadoc explaining when this exception should be used



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+public class CatalogUtils {
+  private static final List<String> S3_FS_SCHEMES = Arrays.asList("s3", "s3a", 
"s3n");
+
+  static boolean hasStorageDescriptorLocationChanged(
+      String storageDescriptorLocation, String tableBasePath) {
+
+    if (StringUtils.isEmpty(storageDescriptorLocation)) {
+      return true;
+    }
+    URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri();
+    URI basePathUri = new Path(tableBasePath).toUri();
+
+    // In case of s3 path, compare without schemes
+    boolean includeScheme =

Review Comment:
   Is there a way to make this more generic so it is not specific to S3?



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncOperations.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogType;
+
+public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable {

Review Comment:
   Will implementations of this be "clients?" Wondering if there is a better 
name for this interface



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.exception.CatalogRefreshException;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable 
table) {
+    Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
+    for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {
+      results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new 
ArrayList<>());
+      results
+          .get(catalogSyncOperation.getTableFormat())
+          .add(getCatalogSyncStatus(catalogSyncOperation, table));
+    }
+    return results;
+  }
+
+  private CatalogSyncStatus getCatalogSyncStatus(
+      CatalogSyncOperations catalogSyncOperation, InternalTable table) {

Review Comment:
   ```suggestion
       private <DATABASE, TABLE> CatalogSyncStatus getCatalogSyncStatus(
         CatalogSyncOperations<DATABASE, TABLE> catalogSyncOperation, 
InternalTable table) {
   ```



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.exception.CatalogRefreshException;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable 
table) {
+    Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
+    for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {
+      results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new 
ArrayList<>());
+      results
+          .get(catalogSyncOperation.getTableFormat())
+          .add(getCatalogSyncStatus(catalogSyncOperation, table));
+    }
+    return results;
+  }
+
+  private CatalogSyncStatus getCatalogSyncStatus(
+      CatalogSyncOperations catalogSyncOperation, InternalTable table) {
+    ExternalCatalog.TableIdentifier tableIdentifier = 
catalogSyncOperation.getTableIdentifier();
+    boolean doesDatabaseExists =
+        catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != 
null;
+    if (!doesDatabaseExists) {
+      catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName());
+    }
+    Object catalogTable = catalogSyncOperation.getTable(tableIdentifier);
+    String storageDescriptorLocation =
+        catalogSyncOperation.getStorageDescriptorLocation(catalogTable);
+    if (catalogTable == null) {
+      catalogSyncOperation.createTable(table, tableIdentifier);
+    } else if (hasStorageDescriptorLocationChanged(
+        storageDescriptorLocation, table.getBasePath())) {
+      // Replace table if there is a mismatch between hmsTable location and 
Xtable basePath.
+      // Possible reasons could be:
+      //  1) hms table (manually) created with a different location before and 
need to be
+      // re-created with a new basePath
+      //  2) XTable basePath changes due to migration or other reasons
+      String oldLocation =
+          StringUtils.isEmpty(storageDescriptorLocation) ? "null" : 
storageDescriptorLocation;
+      log.warn(
+          "StorageDescriptor location changed from {} to {}, re-creating 
table",
+          oldLocation,
+          table.getBasePath());
+      catalogSyncOperation.createOrReplaceTable(table, tableIdentifier);
+    } else {
+      try {
+        log.debug("Table metadata changed, refreshing table");
+        catalogSyncOperation.refreshTable(table, catalogTable, 
tableIdentifier);
+      } catch (CatalogRefreshException e) {
+        log.warn("Table refresh failed, re-creating table", e);
+        catalogSyncOperation.createOrReplaceTable(table, tableIdentifier);

Review Comment:
   Can this cause disruption to consumers? If so, we may want to consider 
whether we want to simply retry the refreshTable operation on certain failure 
types.



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncOperations.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import org.apache.xtable.conversion.ExternalCatalog.TableIdentifier;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogType;
+
+public interface CatalogSyncOperations<DATABASE, TABLE> extends AutoCloseable {
+  String getCatalogIdentifier();

Review Comment:
   Can you add docs for each method so it is clear what each should do?



##########
xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java:
##########
@@ -64,4 +67,27 @@ public static class SyncStatus {
     // Can the client retry for this type of error (Transient error=true, 
persistent error=false)
     boolean canRetryOnFailure;
   }
+
+  /** Represents status for catalog sync status operation */
+  @Value
+  @Builder
+  public static class CatalogSyncStatus {
+    // Catalog Identifier.
+    String catalogIdentifier;
+    // Status code
+    SyncStatusCode statusCode;
+    // errorDetails if any
+    ErrorDetails errorDetails;
+  }
+
+  @Value
+  @Builder
+  public static class ErrorDetails {

Review Comment:
   Should we update the `SyncStatus` to also use `ErrorDetails`?



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+public class CatalogUtils {

Review Comment:
   Can you add unit tests for the methods here?



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.exception.CatalogRefreshException;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable 
table) {
+    Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
+    for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {

Review Comment:
   ```suggestion
       for (CatalogSyncOperations<?, ?> catalogSyncOperation : 
catalogSyncOperations) {
   ```



##########
xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import java.util.Map;
+
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
+
+import org.apache.xtable.model.catalog.CatalogType;
+
+/**
+ * This class represents the configuration for an external catalog. It holds 
information required to
+ * interact with the catalog, such as its identifier, type, table formats to 
sync and other catalog
+ * specific properties related to permissions, catalog uris, access-tokens etc.
+ */
+@Value
+@Builder
+public class ExternalCatalog {
+  /**
+   * An identifier to be used for the catalog if there are multiple catalogs 
of the same type but in
+   * different accounts or regions.
+   */
+  @NonNull String catalogIdentifier;
+
+  /** The type of catalog. */
+  @NonNull CatalogType catalogType;
+
+  /**
+   * The table formats that will be synced to this catalog along with their 
{@link TableIdentifier}.
+   * Eg: ICEBERG -> {marketing, price}, HUDI -> {marketing, price_hudi}, DELTA 
-> {delta_tables,
+   * price}
+   */
+  @NonNull Map<String, TableIdentifier> tableFormatsToSync;

Review Comment:
   We have a concept of a `SourceTable` and `TargetTable` which both extend 
`ExternalTable`. Should we also have a base catalog class that we can use a 
source of information as well in the future?
   



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.exception.CatalogRefreshException;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable 
table) {
+    Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
+    for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {
+      results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new 
ArrayList<>());
+      results
+          .get(catalogSyncOperation.getTableFormat())
+          .add(getCatalogSyncStatus(catalogSyncOperation, table));
+    }
+    return results;
+  }
+
+  private CatalogSyncStatus getCatalogSyncStatus(
+      CatalogSyncOperations catalogSyncOperation, InternalTable table) {
+    ExternalCatalog.TableIdentifier tableIdentifier = 
catalogSyncOperation.getTableIdentifier();
+    boolean doesDatabaseExists =
+        catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != 
null;
+    if (!doesDatabaseExists) {
+      catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName());
+    }
+    Object catalogTable = catalogSyncOperation.getTable(tableIdentifier);

Review Comment:
   ```suggestion
       TABLE catalogTable = catalogSyncOperation.getTable(tableIdentifier);
   ```



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.exception.CatalogRefreshException;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable 
table) {
+    Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
+    for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {
+      results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new 
ArrayList<>());
+      results
+          .get(catalogSyncOperation.getTableFormat())
+          .add(getCatalogSyncStatus(catalogSyncOperation, table));
+    }
+    return results;
+  }
+
+  private CatalogSyncStatus getCatalogSyncStatus(
+      CatalogSyncOperations catalogSyncOperation, InternalTable table) {
+    ExternalCatalog.TableIdentifier tableIdentifier = 
catalogSyncOperation.getTableIdentifier();
+    boolean doesDatabaseExists =
+        catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != 
null;
+    if (!doesDatabaseExists) {
+      catalogSyncOperation.createDatabase(tableIdentifier.getDatabaseName());
+    }
+    Object catalogTable = catalogSyncOperation.getTable(tableIdentifier);
+    String storageDescriptorLocation =
+        catalogSyncOperation.getStorageDescriptorLocation(catalogTable);
+    if (catalogTable == null) {
+      catalogSyncOperation.createTable(table, tableIdentifier);
+    } else if (hasStorageDescriptorLocationChanged(
+        storageDescriptorLocation, table.getBasePath())) {
+      // Replace table if there is a mismatch between hmsTable location and 
Xtable basePath.
+      // Possible reasons could be:
+      //  1) hms table (manually) created with a different location before and 
need to be
+      // re-created with a new basePath
+      //  2) XTable basePath changes due to migration or other reasons
+      String oldLocation =
+          StringUtils.isEmpty(storageDescriptorLocation) ? "null" : 
storageDescriptorLocation;
+      log.warn(
+          "StorageDescriptor location changed from {} to {}, re-creating 
table",
+          oldLocation,
+          table.getBasePath());
+      catalogSyncOperation.createOrReplaceTable(table, tableIdentifier);
+    } else {
+      try {
+        log.debug("Table metadata changed, refreshing table");
+        catalogSyncOperation.refreshTable(table, catalogTable, 
tableIdentifier);
+      } catch (CatalogRefreshException e) {
+        log.warn("Table refresh failed, re-creating table", e);
+        catalogSyncOperation.createOrReplaceTable(table, tableIdentifier);
+      }
+    }
+    return CatalogSyncStatus.builder()

Review Comment:
   Does this need to be wrapped in a try-catch so we can return a proper error 
status?



##########
xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogType.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.catalog;
+
+// TODO: We can add more options here - GLUE, BIG_LAKE, UNITY etc. depending 
on the community
+// feedback.
+public enum CatalogType {

Review Comment:
   For the table formats, we decided to move away from an enum so that users 
could add their own implementations to the classpath without needing to modify 
the source code. For example, if someone had a private Paimon source, it would 
not simply error out since the enum value was missing from our main branch. 
   
   Should we do that here as well?



##########
xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalog.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import java.util.Map;
+
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
+
+import org.apache.xtable.model.catalog.CatalogType;
+
+/**
+ * This class represents the configuration for an external catalog. It holds 
information required to

Review Comment:
   Should we add some clarification that this is different than the Iceberg 
catalog you can configure in XTable currently?



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.conversion.ExternalCatalog;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.exception.CatalogRefreshException;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public Map<String, List<CatalogSyncStatus>> syncTable(
+      Collection<CatalogSyncOperations> catalogSyncOperations, InternalTable 
table) {
+    Map<String, List<CatalogSyncStatus>> results = new HashMap<>();
+    for (CatalogSyncOperations catalogSyncOperation : catalogSyncOperations) {
+      results.computeIfAbsent(catalogSyncOperation.getTableFormat(), k -> new 
ArrayList<>());
+      results
+          .get(catalogSyncOperation.getTableFormat())
+          .add(getCatalogSyncStatus(catalogSyncOperation, table));
+    }
+    return results;
+  }
+
+  private CatalogSyncStatus getCatalogSyncStatus(
+      CatalogSyncOperations catalogSyncOperation, InternalTable table) {
+    ExternalCatalog.TableIdentifier tableIdentifier = 
catalogSyncOperation.getTableIdentifier();
+    boolean doesDatabaseExists =
+        catalogSyncOperation.getDatabase(tableIdentifier.getDatabaseName()) != 
null;

Review Comment:
   should the interface simply provide a `hasDatabase` instead of the 
`getDatabase`? Then we can remove the `DATABASE` generic type on the interface 
to simplify a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to