the-other-tim-brown commented on code in PR #591:
URL: https://github.com/apache/incubator-xtable/pull/591#discussion_r1889188001


##########
xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java:
##########
@@ -41,6 +41,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
    */
   InternalTable getTable(COMMIT commit);
 
+  /**
+   * Extracts the {@link InternalTable} as of latest state.

Review Comment:
   Maybe include some context for future callers that this is less expensive 
than `getCurrentSnapshot` which will load file details?



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+/**
+ * An interface for syncing {@link InternalTable} object to {@link TABLE} 
object defined by the
+ * catalog.
+ *
+ * @param <TABLE>
+ */
+public interface CatalogSyncClient<TABLE> extends AutoCloseable {
+  /**
+   * Returns a unique identifier for the catalog, allows user to sync table to 
multiple catalogs of
+   * the same type eg: HMS catalogs with url1, HMS catalog with url2.
+   */
+  String getCatalogName();
+
+  /**
+   * Returns the full class name which implements the interface for 
CatalogSyncClient.
+   *
+   * @return catalogImplClassName
+   */
+  String getCatalogImpl();

Review Comment:
   is this redundant with `getClass().getName()`



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+/**
+ * An interface for syncing {@link InternalTable} object to {@link TABLE} 
object defined by the
+ * catalog.
+ *
+ * @param <TABLE>
+ */
+public interface CatalogSyncClient<TABLE> extends AutoCloseable {
+  /**
+   * Returns a unique identifier for the catalog, allows user to sync table to 
multiple catalogs of
+   * the same type eg: HMS catalogs with url1, HMS catalog with url2.
+   */
+  String getCatalogName();
+
+  /**
+   * Returns the full class name which implements the interface for 
CatalogSyncClient.
+   *
+   * @return catalogImplClassName
+   */
+  String getCatalogImpl();
+
+  /** Returns the storage location of the table synced to the catalog. */
+  String getStorageDescriptorLocation(TABLE table);

Review Comment:
   What composes a storage descriptor? is it just a path?



##########
xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static 
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+/** Provides the functionality to sync metadata from InternalTable to multiple 
target catalogs */
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+  private static final CatalogSync INSTANCE = new CatalogSync();
+
+  public static CatalogSync getInstance() {
+    return INSTANCE;
+  }
+
+  public SyncResult syncTable(
+      Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients, 
InternalTable table) {
+    List<CatalogSyncStatus> results = new ArrayList<>();
+    Instant startTime = Instant.now();
+    catalogSyncClients.forEach(
+        ((tableIdentifier, catalogSyncClient) -> {
+          try {
+            results.add(getCatalogSyncStatus(catalogSyncClient, 
tableIdentifier, table));
+            log.info(
+                "Catalog sync is successful for table {} using catalogSync {}",
+                table.getBasePath(),
+                catalogSyncClient.getCatalogImpl());
+          } catch (Exception e) {
+            log.error(
+                "Catalog sync failed for table {} using catalogSync {}",
+                table.getBasePath(),
+                catalogSyncClient.getCatalogImpl());
+            results.add(
+                getCatalogSyncFailureStatus(
+                    catalogSyncClient.getCatalogName(), 
catalogSyncClient.getCatalogImpl(), e));
+          }
+        }));
+    return SyncResult.builder()
+        .lastInstantSynced(table.getLatestCommitTime())
+        .syncStartTime(startTime)
+        .syncDuration(Duration.between(startTime, Instant.now()))
+        .catalogSyncStatusList(results)
+        .build();
+  }
+
+  private <TABLE> CatalogSyncStatus getCatalogSyncStatus(

Review Comment:
   nitpick: call this syncCatalog? `getCatalogSyncStatus` to me sounds like the 
sync has already run and you are just getting the result from some store



##########
xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.utilities;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+class TestRunCatalogSync {
+
+  @SneakyThrows
+  @Test
+  void testMain() {
+    String catalogConfigYamlPath =
+        
TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath();
+    String[] args = {"-catalogConfig", catalogConfigYamlPath};
+    // Ensure yaml gets parsed and no op-sync implemented in TestCatalogImpl 
is called.
+    assertDoesNotThrow(() -> RunCatalogSync.main(args));
+  }
+
+  public static class TestCatalogImpl implements CatalogConversionSource, 
CatalogSyncClient {

Review Comment:
   Would it be simpler to use a mock object here?



##########
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java:
##########
@@ -89,71 +103,126 @@ public <COMMIT> Map<String, SyncResult> sync(
     try (ConversionSource<COMMIT> conversionSource =
         
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
+      return syncTableFormats(config, source, config.getSyncMode());
+    } catch (IOException ioException) {
+      throw new ReadException("Failed to close source converter", ioException);
+    }
+  }
 
-      Map<String, ConversionTarget> conversionTargetByFormat =
-          config.getTargetTables().stream()
-              .collect(
-                  Collectors.toMap(
-                      TargetTable::getFormatName,
-                      targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
-      // State for each TableFormat
-      Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
-          conversionTargetByFormat.entrySet().stream()
-              .collect(
-                  Collectors.toMap(
-                      Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
-      Map<String, ConversionTarget> formatsToSyncIncrementally =
-          getFormatsToSyncIncrementally(
-              config,
-              conversionTargetByFormat,
-              lastSyncMetadataByFormat,
-              source.getConversionSource());
-      Map<String, ConversionTarget> formatsToSyncBySnapshot =
-          conversionTargetByFormat.entrySet().stream()
-              .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-      SyncResultForTableFormats syncResultForSnapshotSync =
-          formatsToSyncBySnapshot.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncSnapshot(formatsToSyncBySnapshot, source);
-      SyncResultForTableFormats syncResultForIncrementalSync =
-          formatsToSyncIncrementally.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncIncrementalChanges(
-                  formatsToSyncIncrementally, lastSyncMetadataByFormat, 
source);
-      Map<String, SyncResult> syncResultsMerged =
-          new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
-      syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
-      String successfulSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
-      if (!successfulSyncs.isEmpty()) {
-        log.info("Sync is successful for the following formats {}", 
successfulSyncs);
-      }
-      String failedSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
-      if (!failedSyncs.isEmpty()) {
-        log.error("Sync failed for the following formats {}", failedSyncs);
+  public Map<String, SyncResult> syncCatalogs(
+      ConversionConfig config, Map<String, ConversionSourceProvider> 
conversionSourceProvider) {
+    if (config.getTargetTables() == null || 
config.getTargetTables().isEmpty()) {
+      throw new IllegalArgumentException("Please provide at-least one format 
to sync");
+    }
+    try (ConversionSource conversionSource =
+        conversionSourceProvider
+            .get(config.getSourceTable().getFormatName())
+            .getConversionSourceInstance(config.getSourceTable())) {
+      ExtractFromSource source = ExtractFromSource.of(conversionSource);
+      Map<String, SyncResult> tableFormatSyncResults =
+          syncTableFormats(config, source, config.getSyncMode());
+      Map<String, SyncResult> catalogSyncResults = new HashMap<>();
+      for (TargetTable targetTable : config.getTargetTables()) {
+        Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
+            config.getTargetCatalogs().get(targetTable.getId()).stream()
+                .collect(
+                    Collectors.toMap(
+                        TargetCatalogConfig::getCatalogTableIdentifier,
+                        targetCatalog ->
+                            catalogConversionFactory.createCatalogSyncClient(
+                                targetCatalog.getCatalogConfig(), conf)));
+        catalogSyncResults.put(
+            targetTable.getFormatName(),
+            syncCatalogsForTable(
+                targetTable,
+                catalogSyncClients,
+                conversionSourceProvider.get(targetTable.getFormatName())));
       }
-      return syncResultsMerged;
+      mergeSyncResults(tableFormatSyncResults, catalogSyncResults);
+      return tableFormatSyncResults;
     } catch (IOException ioException) {
       throw new ReadException("Failed to close source converter", ioException);
     }
   }
 
+  private <COMMIT> Map<String, SyncResult> syncTableFormats(
+      ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode 
syncMode) {
+    Map<String, ConversionTarget> conversionTargetByFormat =
+        config.getTargetTables().stream()
+            .filter(
+                targetTable ->
+                    
!targetTable.getFormatName().equals(config.getSourceTable().getFormatName()))
+            .collect(
+                Collectors.toMap(
+                    TargetTable::getFormatName,
+                    targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
+
+    Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
+        conversionTargetByFormat.entrySet().stream()
+            .collect(
+                Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
+    Map<String, ConversionTarget> formatsToSyncIncrementally =
+        getFormatsToSyncIncrementally(
+            syncMode,
+            conversionTargetByFormat,
+            lastSyncMetadataByFormat,
+            source.getConversionSource());
+    Map<String, ConversionTarget> formatsToSyncBySnapshot =
+        conversionTargetByFormat.entrySet().stream()
+            .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    SyncResultForTableFormats syncResultForSnapshotSync =
+        formatsToSyncBySnapshot.isEmpty()
+            ? SyncResultForTableFormats.builder().build()
+            : syncSnapshot(formatsToSyncBySnapshot, source);
+    SyncResultForTableFormats syncResultForIncrementalSync =
+        formatsToSyncIncrementally.isEmpty()
+            ? SyncResultForTableFormats.builder().build()
+            : syncIncrementalChanges(formatsToSyncIncrementally, 
lastSyncMetadataByFormat, source);
+    Map<String, SyncResult> syncResultsMerged =
+        new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
+    syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
+    String successfulSyncs =
+        getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
+    if (!successfulSyncs.isEmpty()) {
+      log.info("Sync is successful for the following formats {}", 
successfulSyncs);
+    }
+    String failedSyncs =
+        getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
+    if (!failedSyncs.isEmpty()) {
+      log.error("Sync failed for the following formats {}", failedSyncs);
+    }

Review Comment:
   Do we want similar logging with the catalog results?



##########
xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.spi.sync;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.sync.SyncResult;
+
+@ExtendWith(MockitoExtension.class)
+public class TestCatalogSync<TABLE> {
+
+  private final CatalogSyncClient<TABLE> mockClient1 = 
mock(CatalogSyncClient.class);
+  private final CatalogSyncClient<TABLE> mockClient2 = 
mock(CatalogSyncClient.class);
+  private final CatalogSyncClient<TABLE> mockClient3 = 
mock(CatalogSyncClient.class);
+  private final CatalogSyncClient<TABLE> mockClient4 = 
mock(CatalogSyncClient.class);
+
+  private final CatalogTableIdentifier tableIdentifier1 =
+      
CatalogTableIdentifier.builder().databaseName("database1").tableName("table1").build();
+  private final CatalogTableIdentifier tableIdentifier2 =
+      
CatalogTableIdentifier.builder().databaseName("database2").tableName("table2").build();
+  private final CatalogTableIdentifier tableIdentifier3 =
+      
CatalogTableIdentifier.builder().databaseName("database3").tableName("table3").build();
+  private final CatalogTableIdentifier tableIdentifier4 =
+      
CatalogTableIdentifier.builder().databaseName("database4").tableName("table4").build();
+
+  @Mock TABLE mockTable;

Review Comment:
   nitpick: let's be consistent with using `@Mock` vs `mock(...)` in the same 
class



##########
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java:
##########
@@ -89,71 +103,126 @@ public <COMMIT> Map<String, SyncResult> sync(
     try (ConversionSource<COMMIT> conversionSource =
         
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
+      return syncTableFormats(config, source, config.getSyncMode());
+    } catch (IOException ioException) {
+      throw new ReadException("Failed to close source converter", ioException);
+    }
+  }
 
-      Map<String, ConversionTarget> conversionTargetByFormat =
-          config.getTargetTables().stream()
-              .collect(
-                  Collectors.toMap(
-                      TargetTable::getFormatName,
-                      targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
-      // State for each TableFormat
-      Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
-          conversionTargetByFormat.entrySet().stream()
-              .collect(
-                  Collectors.toMap(
-                      Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
-      Map<String, ConversionTarget> formatsToSyncIncrementally =
-          getFormatsToSyncIncrementally(
-              config,
-              conversionTargetByFormat,
-              lastSyncMetadataByFormat,
-              source.getConversionSource());
-      Map<String, ConversionTarget> formatsToSyncBySnapshot =
-          conversionTargetByFormat.entrySet().stream()
-              .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-      SyncResultForTableFormats syncResultForSnapshotSync =
-          formatsToSyncBySnapshot.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncSnapshot(formatsToSyncBySnapshot, source);
-      SyncResultForTableFormats syncResultForIncrementalSync =
-          formatsToSyncIncrementally.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncIncrementalChanges(
-                  formatsToSyncIncrementally, lastSyncMetadataByFormat, 
source);
-      Map<String, SyncResult> syncResultsMerged =
-          new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
-      syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
-      String successfulSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
-      if (!successfulSyncs.isEmpty()) {
-        log.info("Sync is successful for the following formats {}", 
successfulSyncs);
-      }
-      String failedSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
-      if (!failedSyncs.isEmpty()) {
-        log.error("Sync failed for the following formats {}", failedSyncs);
+  public Map<String, SyncResult> syncCatalogs(

Review Comment:
   Do we want to name this so it is clear that this will also sync tables 
metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to