This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f25adc8 Refactor RunSync Class for Improved Modularity and Reusability
5f25adc8 is described below

commit 5f25adc8ebe4a6f7c0c307aafda42e9273f580d1
Author: vaibhavk1992 <[email protected]>
AuthorDate: Tue Mar 25 23:13:29 2025 +0530

    Refactor RunSync Class for Improved Modularity and Reusability
    
    removing extra spaces
    
    moving config file to resources
    
    adding spotless apply
---
 .../apache/xtable/utilities/RunCatalogSync.java    |   9 +-
 .../java/org/apache/xtable/utilities/RunSync.java  | 217 +++++++++++++--------
 .../org/apache/xtable/utilities/TestRunSync.java   |  28 ++-
 xtable-utilities/src/test/resources/my_config.yaml |  24 +++
 4 files changed, 192 insertions(+), 86 deletions(-)

diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 899365e4..43549d1b 100644
--- 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++ 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -19,6 +19,7 @@
 package org.apache.xtable.utilities;
 
 import static org.apache.xtable.utilities.RunSync.getCustomConfigurations;
+import static org.apache.xtable.utilities.RunSync.getValueFromConfig;
 import static org.apache.xtable.utilities.RunSync.loadHadoopConf;
 import static 
org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs;
 
@@ -131,11 +132,11 @@ public class RunCatalogSync {
             
Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
       datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
     }
-
-    byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
+    String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
+    byte[] customConfig = getCustomConfigurations(hadoopConfigpath);
     Configuration hadoopConf = loadHadoopConf(customConfig);
-
-    customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
+    String conversionProviderConfigpath = getValueFromConfig(cmd, 
CONVERTERS_CONFIG_PATH);
+    customConfig = getCustomConfigurations(conversionProviderConfigpath);
     TableFormatConverters tableFormatConverters = 
loadTableFormatConversionConfigs(customConfig);
 
     Map<String, ExternalCatalogConfig> catalogsById =
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index 8ac4e6e4..facbcf3a 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
 
 import lombok.Builder;
 import lombok.Data;
+import lombok.NonNull;
 import lombok.Value;
 import lombok.extern.jackson.Jacksonized;
 import lombok.extern.log4j.Log4j2;
@@ -51,6 +52,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.xtable.conversion.CatalogConfig;
 import org.apache.xtable.conversion.ConversionConfig;
 import org.apache.xtable.conversion.ConversionController;
 import org.apache.xtable.conversion.ConversionSourceProvider;
@@ -115,6 +117,126 @@ public class RunSync {
               "The interval in seconds to schedule the loop. Requires 
--continuousMode to be set. Defaults to 5 seconds.")
           .addOption(HELP_OPTION, "help", false, "Displays help information to 
run this utility");
 
+  static SourceTable sourceTableBuilder(
+      @NonNull DatasetConfig.Table table,
+      CatalogConfig catalogConfig,
+      @NonNull DatasetConfig datasetConfig,
+      Properties sourceProperties) {
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(table.getTableName())
+            .basePath(table.getTableBasePath())
+            .namespace(table.getNamespace() == null ? null : 
table.getNamespace().split("\\."))
+            .dataPath(table.getTableDataPath())
+            .catalogConfig(catalogConfig)
+            .additionalProperties(sourceProperties)
+            .formatName(datasetConfig.sourceFormat)
+            .build();
+    return sourceTable;
+  }
+
+  static List<TargetTable> targetTableBuilder(
+      @NonNull DatasetConfig.Table table,
+      CatalogConfig catalogConfig,
+      @NonNull List<String> tableFormatList) {
+    List<TargetTable> targetTables =
+        tableFormatList.stream()
+            .map(
+                tableFormat ->
+                    TargetTable.builder()
+                        .name(table.getTableName())
+                        .basePath(table.getTableBasePath())
+                        .namespace(
+                            table.getNamespace() == null ? null : 
table.getNamespace().split("\\."))
+                        .catalogConfig(catalogConfig)
+                        .formatName(tableFormat)
+                        .build())
+            .collect(Collectors.toList());
+    return targetTables;
+  }
+
+  static void syncTableMetdata(
+      DatasetConfig datasetConfig,
+      List<String> tableFormatList,
+      CatalogConfig catalogConfig,
+      Configuration hadoopConf,
+      ConversionSourceProvider conversionSourceProvider) {
+    ConversionController conversionController = new 
ConversionController(hadoopConf);
+    for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
+      log.info(
+          "Running sync for basePath {} for following table formats {}",
+          table.getTableBasePath(),
+          tableFormatList);
+      Properties sourceProperties = new Properties();
+      if (table.getPartitionSpec() != null) {
+        sourceProperties.put(
+            HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
table.getPartitionSpec());
+      }
+
+      SourceTable sourceTable =
+          sourceTableBuilder(table, catalogConfig, datasetConfig, 
sourceProperties);
+      List<TargetTable> targetTables = targetTableBuilder(table, 
catalogConfig, tableFormatList);
+      ConversionConfig conversionConfig =
+          ConversionConfig.builder()
+              .sourceTable(sourceTable)
+              .targetTables(targetTables)
+              .syncMode(SyncMode.INCREMENTAL)
+              .build();
+      try {
+        conversionController.sync(conversionConfig, conversionSourceProvider);
+      } catch (Exception e) {
+        log.error("Error running sync for {}", table.getTableBasePath(), e);
+      }
+    }
+  }
+
+  static DatasetConfig getDatasetConfig(String datasetConfigPath) throws 
IOException {
+    // Initialize DatasetConfig
+    try (InputStream inputStream = 
Files.newInputStream(Paths.get(datasetConfigPath))) {
+      return YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
+    }
+  }
+
+  static Configuration gethadoopConf(String hadoopConfigPath) throws 
IOException {
+    // Load configurations
+    byte[] customConfig = getCustomConfigurations(hadoopConfigPath);
+    Configuration hadoopConf = loadHadoopConf(customConfig);
+    return hadoopConf;
+  }
+
+  static CatalogConfig getIcebergCatalogConfig(String 
icebergCatalogConfigPath) throws IOException {
+    // Load configurations
+    byte[] icebergCatalogConfigInput = 
getCustomConfigurations(icebergCatalogConfigPath);
+    CatalogConfig catalogConfig = 
loadIcebergCatalogConfig(icebergCatalogConfigInput);
+    return catalogConfig;
+  }
+
+  static ConversionSourceProvider<?> getConversionSourceProvider(
+      String conversionProviderConfigpath, DatasetConfig datasetConfig, 
Configuration hadoopConf)
+      throws IOException {
+    // Process source format
+    String sourceFormat = datasetConfig.sourceFormat;
+    byte[] customConfig = 
getCustomConfigurations(conversionProviderConfigpath);
+    TableFormatConverters tableFormatConverters = 
loadTableFormatConversionConfigs(customConfig);
+    TableFormatConverters.ConversionConfig sourceConversionConfig =
+        tableFormatConverters.getTableFormatConverters().get(sourceFormat);
+    if (sourceConversionConfig == null) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Source format %s is not supported. Known source and target 
formats are %s",
+              sourceFormat, 
tableFormatConverters.getTableFormatConverters().keySet()));
+    }
+    String sourceProviderClass = 
sourceConversionConfig.conversionSourceProviderClass;
+    ConversionSourceProvider<?> conversionSourceProvider =
+        ReflectionUtils.createInstanceOfClass(sourceProviderClass);
+    conversionSourceProvider.init(hadoopConf);
+    return conversionSourceProvider;
+  }
+
+  static String getValueFromConfig(CommandLine cmd, String configFlag) {
+    return cmd.getOptionValue(configFlag);
+  }
+
   public static void main(String[] args) throws IOException {
     CommandLineParser parser = new DefaultParser();
 
@@ -162,89 +284,24 @@ public class RunSync {
   }
 
   private static void runSync(CommandLine cmd) throws IOException {
-    DatasetConfig datasetConfig;
-    try (InputStream inputStream =
-        
Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) {
-      datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
-    }
-
-    byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
-    Configuration hadoopConf = loadHadoopConf(customConfig);
-    byte[] icebergCatalogConfigInput = getCustomConfigurations(cmd, 
ICEBERG_CATALOG_CONFIG_PATH);
-    IcebergCatalogConfig icebergCatalogConfig = 
loadIcebergCatalogConfig(icebergCatalogConfigInput);
-
-    String sourceFormat = datasetConfig.sourceFormat;
-    customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
-    TableFormatConverters tableFormatConverters = 
loadTableFormatConversionConfigs(customConfig);
-    TableFormatConverters.ConversionConfig sourceConversionConfig =
-        tableFormatConverters.getTableFormatConverters().get(sourceFormat);
-    if (sourceConversionConfig == null) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Source format %s is not supported. Known source and target 
formats are %s",
-              sourceFormat, 
tableFormatConverters.getTableFormatConverters().keySet()));
-    }
-    String sourceProviderClass = 
sourceConversionConfig.conversionSourceProviderClass;
-    ConversionSourceProvider<?> conversionSourceProvider =
-        ReflectionUtils.createInstanceOfClass(sourceProviderClass);
-    conversionSourceProvider.init(hadoopConf);
-
+    String datasetConfigpath = getValueFromConfig(cmd, DATASET_CONFIG_OPTION);
+    String icebergCatalogConfigpath = getValueFromConfig(cmd, 
ICEBERG_CATALOG_CONFIG_PATH);
+    String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
+    String conversionProviderConfigpath = getValueFromConfig(cmd, 
CONVERTERS_CONFIG_PATH);
+    DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath);
+    CatalogConfig catalogConfig = 
getIcebergCatalogConfig(icebergCatalogConfigpath);
+    Configuration hadoopConf = gethadoopConf(hadoopConfigpath);
+    ConversionSourceProvider conversionSourceProvider =
+        getConversionSourceProvider(conversionProviderConfigpath, 
datasetConfig, hadoopConf);
     List<String> tableFormatList = datasetConfig.getTargetFormats();
-    ConversionController conversionController = new 
ConversionController(hadoopConf);
-    for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
-      log.info(
-          "Running sync for basePath {} for following table formats {}",
-          table.getTableBasePath(),
-          tableFormatList);
-      Properties sourceProperties = new Properties();
-      if (table.getPartitionSpec() != null) {
-        sourceProperties.put(
-            HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
table.getPartitionSpec());
-      }
-      SourceTable sourceTable =
-          SourceTable.builder()
-              .name(table.getTableName())
-              .basePath(table.getTableBasePath())
-              .namespace(table.getNamespace() == null ? null : 
table.getNamespace().split("\\."))
-              .dataPath(table.getTableDataPath())
-              .catalogConfig(icebergCatalogConfig)
-              .additionalProperties(sourceProperties)
-              .formatName(sourceFormat)
-              .build();
-      List<TargetTable> targetTables =
-          tableFormatList.stream()
-              .map(
-                  tableFormat ->
-                      TargetTable.builder()
-                          .name(table.getTableName())
-                          .basePath(table.getTableBasePath())
-                          .namespace(
-                              table.getNamespace() == null
-                                  ? null
-                                  : table.getNamespace().split("\\."))
-                          .catalogConfig(icebergCatalogConfig)
-                          .formatName(tableFormat)
-                          .build())
-              .collect(Collectors.toList());
-
-      ConversionConfig conversionConfig =
-          ConversionConfig.builder()
-              .sourceTable(sourceTable)
-              .targetTables(targetTables)
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
-      try {
-        conversionController.sync(conversionConfig, conversionSourceProvider);
-      } catch (Exception e) {
-        log.error("Error running sync for {}", table.getTableBasePath(), e);
-      }
-    }
+    syncTableMetdata(
+        datasetConfig, tableFormatList, catalogConfig, hadoopConf, 
conversionSourceProvider);
   }
 
-  static byte[] getCustomConfigurations(CommandLine cmd, String option) throws 
IOException {
+  static byte[] getCustomConfigurations(String Configpath) throws IOException {
     byte[] customConfig = null;
-    if (cmd.hasOption(option)) {
-      customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option)));
+    if (Configpath != null) {
+      customConfig = Files.readAllBytes(Paths.get(Configpath));
     }
     return customConfig;
   }
@@ -282,7 +339,7 @@ public class RunSync {
   }
 
   @VisibleForTesting
-  static IcebergCatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) 
throws IOException {
+  static CatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) throws 
IOException {
     return customConfigs == null
         ? null
         : YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class);
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
index 20a190e8..273854a0 100644
--- 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
@@ -23,18 +23,42 @@ import static 
org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import org.apache.xtable.iceberg.IcebergCatalogConfig;
+import org.apache.xtable.conversion.CatalogConfig;
+import org.apache.xtable.utilities.RunSync.DatasetConfig;
 import org.apache.xtable.utilities.RunSync.TableFormatConverters;
 import 
org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig;
 
 class TestRunSync {
 
+  @Test
+  public void testMain() {
+    String filePath = 
TestRunSync.class.getClassLoader().getResource("my_config.yaml").getPath();
+    String[] args = new String[] {"--datasetConfig", filePath};
+    Assertions.assertDoesNotThrow(
+        () -> RunSync.main(args), "RunSync.main() threw an unexpected 
exception.");
+  }
+
+  @Test
+  public void testGetDatasetConfigWithNonExistentFile() {
+    URL resourceUrl = 
TestRunSync.class.getClassLoader().getResource("my_config1.yaml");
+    Assertions.assertNull(resourceUrl, "Config file not found in classpath");
+  }
+
+  @Test
+  public void testGetDatasetConfigWithValidYAML() throws IOException {
+    String filePath = 
TestRunSync.class.getClassLoader().getResource("my_config.yaml").getPath();
+    DatasetConfig config = RunSync.getDatasetConfig(filePath);
+    // Assert
+    Assertions.assertNotNull(config);
+  }
+
   /** Tests that the default hadoop configs are loaded. */
   @Test
   public void testLoadDefaultHadoopConfig() {
@@ -142,7 +166,7 @@ class TestRunSync {
             + "catalogOptions: \n"
             + "  option1: value1\n"
             + "  option2: value2";
-    IcebergCatalogConfig catalogConfig = 
RunSync.loadIcebergCatalogConfig(icebergConfig.getBytes());
+    CatalogConfig catalogConfig = 
RunSync.loadIcebergCatalogConfig(icebergConfig.getBytes());
     Assertions.assertEquals("org.apache.xtable.CatalogImpl", 
catalogConfig.getCatalogImpl());
     Assertions.assertEquals("test", catalogConfig.getCatalogName());
     Assertions.assertEquals(2, catalogConfig.getCatalogOptions().size());
diff --git a/xtable-utilities/src/test/resources/my_config.yaml 
b/xtable-utilities/src/test/resources/my_config.yaml
new file mode 100644
index 00000000..1416c04c
--- /dev/null
+++ b/xtable-utilities/src/test/resources/my_config.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+sourceFormat: ICEBERG
+targetFormats:
+  - DELTA
+datasets:
+  -
+    tableBasePath:  /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis
+    tableDataPath:  /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data
+    tableName: taxis
\ No newline at end of file

Reply via email to