This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 5f25adc8 Refactor RunSync Class for Improved Modularity and Reusability
5f25adc8 is described below
commit 5f25adc8ebe4a6f7c0c307aafda42e9273f580d1
Author: vaibhavk1992 <[email protected]>
AuthorDate: Tue Mar 25 23:13:29 2025 +0530
Refactor RunSync Class for Improved Modularity and Reusability
removing extra spaces
moving config file to resources
adding spotless apply
---
.../apache/xtable/utilities/RunCatalogSync.java | 9 +-
.../java/org/apache/xtable/utilities/RunSync.java | 217 +++++++++++++--------
.../org/apache/xtable/utilities/TestRunSync.java | 28 ++-
xtable-utilities/src/test/resources/my_config.yaml | 24 +++
4 files changed, 192 insertions(+), 86 deletions(-)
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 899365e4..43549d1b 100644
---
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -19,6 +19,7 @@
package org.apache.xtable.utilities;
import static org.apache.xtable.utilities.RunSync.getCustomConfigurations;
+import static org.apache.xtable.utilities.RunSync.getValueFromConfig;
import static org.apache.xtable.utilities.RunSync.loadHadoopConf;
import static
org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs;
@@ -131,11 +132,11 @@ public class RunCatalogSync {
Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
}
-
- byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
+ String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
+ byte[] customConfig = getCustomConfigurations(hadoopConfigpath);
Configuration hadoopConf = loadHadoopConf(customConfig);
-
- customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
+ String conversionProviderConfigpath = getValueFromConfig(cmd,
CONVERTERS_CONFIG_PATH);
+ customConfig = getCustomConfigurations(conversionProviderConfigpath);
TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);
Map<String, ExternalCatalogConfig> catalogsById =
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index 8ac4e6e4..facbcf3a 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Data;
+import lombok.NonNull;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import lombok.extern.log4j.Log4j2;
@@ -51,6 +52,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.xtable.conversion.CatalogConfig;
import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
@@ -115,6 +117,126 @@ public class RunSync {
"The interval in seconds to schedule the loop. Requires
--continuousMode to be set. Defaults to 5 seconds.")
.addOption(HELP_OPTION, "help", false, "Displays help information to
run this utility");
+ static SourceTable sourceTableBuilder(
+ @NonNull DatasetConfig.Table table,
+ CatalogConfig catalogConfig,
+ @NonNull DatasetConfig datasetConfig,
+ Properties sourceProperties) {
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
+ .namespace(table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
+ .dataPath(table.getTableDataPath())
+ .catalogConfig(catalogConfig)
+ .additionalProperties(sourceProperties)
+ .formatName(datasetConfig.sourceFormat)
+ .build();
+ return sourceTable;
+ }
+
+ static List<TargetTable> targetTableBuilder(
+ @NonNull DatasetConfig.Table table,
+ CatalogConfig catalogConfig,
+ @NonNull List<String> tableFormatList) {
+ List<TargetTable> targetTables =
+ tableFormatList.stream()
+ .map(
+ tableFormat ->
+ TargetTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
+ .namespace(
+ table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
+ .catalogConfig(catalogConfig)
+ .formatName(tableFormat)
+ .build())
+ .collect(Collectors.toList());
+ return targetTables;
+ }
+
+ static void syncTableMetdata(
+ DatasetConfig datasetConfig,
+ List<String> tableFormatList,
+ CatalogConfig catalogConfig,
+ Configuration hadoopConf,
+ ConversionSourceProvider conversionSourceProvider) {
+ ConversionController conversionController = new
ConversionController(hadoopConf);
+ for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
+ log.info(
+ "Running sync for basePath {} for following table formats {}",
+ table.getTableBasePath(),
+ tableFormatList);
+ Properties sourceProperties = new Properties();
+ if (table.getPartitionSpec() != null) {
+ sourceProperties.put(
+ HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
table.getPartitionSpec());
+ }
+
+ SourceTable sourceTable =
+ sourceTableBuilder(table, catalogConfig, datasetConfig,
sourceProperties);
+ List<TargetTable> targetTables = targetTableBuilder(table,
catalogConfig, tableFormatList);
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
+ .syncMode(SyncMode.INCREMENTAL)
+ .build();
+ try {
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ } catch (Exception e) {
+ log.error("Error running sync for {}", table.getTableBasePath(), e);
+ }
+ }
+ }
+
+ static DatasetConfig getDatasetConfig(String datasetConfigPath) throws
IOException {
+ // Initialize DatasetConfig
+ try (InputStream inputStream =
Files.newInputStream(Paths.get(datasetConfigPath))) {
+ return YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
+ }
+ }
+
+ static Configuration gethadoopConf(String hadoopConfigPath) throws
IOException {
+ // Load configurations
+ byte[] customConfig = getCustomConfigurations(hadoopConfigPath);
+ Configuration hadoopConf = loadHadoopConf(customConfig);
+ return hadoopConf;
+ }
+
+ static CatalogConfig getIcebergCatalogConfig(String
icebergCatalogConfigPath) throws IOException {
+ // Load configurations
+ byte[] icebergCatalogConfigInput =
getCustomConfigurations(icebergCatalogConfigPath);
+ CatalogConfig catalogConfig =
loadIcebergCatalogConfig(icebergCatalogConfigInput);
+ return catalogConfig;
+ }
+
+ static ConversionSourceProvider<?> getConversionSourceProvider(
+ String conversionProviderConfigpath, DatasetConfig datasetConfig,
Configuration hadoopConf)
+ throws IOException {
+ // Process source format
+ String sourceFormat = datasetConfig.sourceFormat;
+ byte[] customConfig =
getCustomConfigurations(conversionProviderConfigpath);
+ TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);
+ TableFormatConverters.ConversionConfig sourceConversionConfig =
+ tableFormatConverters.getTableFormatConverters().get(sourceFormat);
+ if (sourceConversionConfig == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Source format %s is not supported. Known source and target
formats are %s",
+ sourceFormat,
tableFormatConverters.getTableFormatConverters().keySet()));
+ }
+ String sourceProviderClass =
sourceConversionConfig.conversionSourceProviderClass;
+ ConversionSourceProvider<?> conversionSourceProvider =
+ ReflectionUtils.createInstanceOfClass(sourceProviderClass);
+ conversionSourceProvider.init(hadoopConf);
+ return conversionSourceProvider;
+ }
+
+ static String getValueFromConfig(CommandLine cmd, String configFlag) {
+ return cmd.getOptionValue(configFlag);
+ }
+
public static void main(String[] args) throws IOException {
CommandLineParser parser = new DefaultParser();
@@ -162,89 +284,24 @@ public class RunSync {
}
private static void runSync(CommandLine cmd) throws IOException {
- DatasetConfig datasetConfig;
- try (InputStream inputStream =
-
Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) {
- datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
- }
-
- byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
- Configuration hadoopConf = loadHadoopConf(customConfig);
- byte[] icebergCatalogConfigInput = getCustomConfigurations(cmd,
ICEBERG_CATALOG_CONFIG_PATH);
- IcebergCatalogConfig icebergCatalogConfig =
loadIcebergCatalogConfig(icebergCatalogConfigInput);
-
- String sourceFormat = datasetConfig.sourceFormat;
- customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
- TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);
- TableFormatConverters.ConversionConfig sourceConversionConfig =
- tableFormatConverters.getTableFormatConverters().get(sourceFormat);
- if (sourceConversionConfig == null) {
- throw new IllegalArgumentException(
- String.format(
- "Source format %s is not supported. Known source and target
formats are %s",
- sourceFormat,
tableFormatConverters.getTableFormatConverters().keySet()));
- }
- String sourceProviderClass =
sourceConversionConfig.conversionSourceProviderClass;
- ConversionSourceProvider<?> conversionSourceProvider =
- ReflectionUtils.createInstanceOfClass(sourceProviderClass);
- conversionSourceProvider.init(hadoopConf);
-
+ String datasetConfigpath = getValueFromConfig(cmd, DATASET_CONFIG_OPTION);
+ String icebergCatalogConfigpath = getValueFromConfig(cmd,
ICEBERG_CATALOG_CONFIG_PATH);
+ String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
+ String conversionProviderConfigpath = getValueFromConfig(cmd,
CONVERTERS_CONFIG_PATH);
+ DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath);
+ CatalogConfig catalogConfig =
getIcebergCatalogConfig(icebergCatalogConfigpath);
+ Configuration hadoopConf = gethadoopConf(hadoopConfigpath);
+ ConversionSourceProvider conversionSourceProvider =
+ getConversionSourceProvider(conversionProviderConfigpath,
datasetConfig, hadoopConf);
List<String> tableFormatList = datasetConfig.getTargetFormats();
- ConversionController conversionController = new
ConversionController(hadoopConf);
- for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
- log.info(
- "Running sync for basePath {} for following table formats {}",
- table.getTableBasePath(),
- tableFormatList);
- Properties sourceProperties = new Properties();
- if (table.getPartitionSpec() != null) {
- sourceProperties.put(
- HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
table.getPartitionSpec());
- }
- SourceTable sourceTable =
- SourceTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
- .dataPath(table.getTableDataPath())
- .catalogConfig(icebergCatalogConfig)
- .additionalProperties(sourceProperties)
- .formatName(sourceFormat)
- .build();
- List<TargetTable> targetTables =
- tableFormatList.stream()
- .map(
- tableFormat ->
- TargetTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(
- table.getNamespace() == null
- ? null
- : table.getNamespace().split("\\."))
- .catalogConfig(icebergCatalogConfig)
- .formatName(tableFormat)
- .build())
- .collect(Collectors.toList());
-
- ConversionConfig conversionConfig =
- ConversionConfig.builder()
- .sourceTable(sourceTable)
- .targetTables(targetTables)
- .syncMode(SyncMode.INCREMENTAL)
- .build();
- try {
- conversionController.sync(conversionConfig, conversionSourceProvider);
- } catch (Exception e) {
- log.error("Error running sync for {}", table.getTableBasePath(), e);
- }
- }
+ syncTableMetdata(
+ datasetConfig, tableFormatList, catalogConfig, hadoopConf,
conversionSourceProvider);
}
- static byte[] getCustomConfigurations(CommandLine cmd, String option) throws
IOException {
+ static byte[] getCustomConfigurations(String Configpath) throws IOException {
byte[] customConfig = null;
- if (cmd.hasOption(option)) {
- customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option)));
+ if (Configpath != null) {
+ customConfig = Files.readAllBytes(Paths.get(Configpath));
}
return customConfig;
}
@@ -282,7 +339,7 @@ public class RunSync {
}
@VisibleForTesting
- static IcebergCatalogConfig loadIcebergCatalogConfig(byte[] customConfigs)
throws IOException {
+ static CatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) throws
IOException {
return customConfigs == null
? null
: YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class);
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
index 20a190e8..273854a0 100644
---
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
+++
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
@@ -23,18 +23,42 @@ import static
org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
import java.io.IOException;
+import java.net.URL;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.apache.xtable.iceberg.IcebergCatalogConfig;
+import org.apache.xtable.conversion.CatalogConfig;
+import org.apache.xtable.utilities.RunSync.DatasetConfig;
import org.apache.xtable.utilities.RunSync.TableFormatConverters;
import
org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig;
class TestRunSync {
+ @Test
+ public void testMain() {
+ String filePath =
TestRunSync.class.getClassLoader().getResource("my_config.yaml").getPath();
+ String[] args = new String[] {"--datasetConfig", filePath};
+ Assertions.assertDoesNotThrow(
+ () -> RunSync.main(args), "RunSync.main() threw an unexpected
exception.");
+ }
+
+ @Test
+ public void testGetDatasetConfigWithNonExistentFile() {
+ URL resourceUrl =
TestRunSync.class.getClassLoader().getResource("my_config1.yaml");
+ Assertions.assertNull(resourceUrl, "Config file not found in classpath");
+ }
+
+ @Test
+ public void testGetDatasetConfigWithValidYAML() throws IOException {
+ String filePath =
TestRunSync.class.getClassLoader().getResource("my_config.yaml").getPath();
+ DatasetConfig config = RunSync.getDatasetConfig(filePath);
+ // Assert
+ Assertions.assertNotNull(config);
+ }
+
/** Tests that the default hadoop configs are loaded. */
@Test
public void testLoadDefaultHadoopConfig() {
@@ -142,7 +166,7 @@ class TestRunSync {
+ "catalogOptions: \n"
+ " option1: value1\n"
+ " option2: value2";
- IcebergCatalogConfig catalogConfig =
RunSync.loadIcebergCatalogConfig(icebergConfig.getBytes());
+ CatalogConfig catalogConfig =
RunSync.loadIcebergCatalogConfig(icebergConfig.getBytes());
Assertions.assertEquals("org.apache.xtable.CatalogImpl",
catalogConfig.getCatalogImpl());
Assertions.assertEquals("test", catalogConfig.getCatalogName());
Assertions.assertEquals(2, catalogConfig.getCatalogOptions().size());
diff --git a/xtable-utilities/src/test/resources/my_config.yaml
b/xtable-utilities/src/test/resources/my_config.yaml
new file mode 100644
index 00000000..1416c04c
--- /dev/null
+++ b/xtable-utilities/src/test/resources/my_config.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+sourceFormat: ICEBERG
+targetFormats:
+ - DELTA
+datasets:
+ -
+ tableBasePath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis
+ tableDataPath: /Desktop/opensource/iceberg/warehouse/demo/nyc/taxis/data
+ tableName: taxis
\ No newline at end of file