This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch IcebergSourceFix
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit 1137757bf80ebcf378b5eed9f48e62ba26376a06
Author: Vinish Reddy <[email protected]>
AuthorDate: Fri Dec 13 18:50:13 2024 -0800

    Normalize basePath in targetTables in ConversionController
---
 .../xtable/conversion/ConversionController.java    |   2 +-
 .../apache/xtable/conversion/ConversionUtils.java  |  57 +++++++++++
 .../org/apache/xtable/ITConversionController.java  |   3 +-
 .../xtable/conversion/TestConversionUtils.java     | 111 +++++++++++++++++++++
 4 files changed, 170 insertions(+), 3 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index dc665969..bc5f5e02 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -85,7 +85,7 @@ public class ConversionController {
     if (config.getTargetTables() == null || 
config.getTargetTables().isEmpty()) {
       throw new IllegalArgumentException("Please provide at-least one format 
to sync");
     }
-
+    config = ConversionUtils.normalizeTargetPaths(config);
     try (ConversionSource<COMMIT> conversionSource =
         
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
new file mode 100644
index 00000000..fdeedc9b
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.xtable.model.storage.TableFormat;
+
+public class ConversionUtils {
+
+  /**
+   * Few table formats need the metadata to be located at the root level of 
the data files. Eg: An
+   * iceberg table generated through spark will have two directories 
basePath/data and
+   * basePath/metadata For synchronising the iceberg metadata to hudi and 
delta, they need to be
+   * present in basePath/data/.hoodie and basePath/data/_delta_log.
+   *
+   * @param config conversion config for synchronizing source and target tables
+   * @return updated table config.
+   */
+  public static ConversionConfig normalizeTargetPaths(ConversionConfig config) 
{
+    if 
(!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath())
+        && 
config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) {
+      List<TargetTable> updatedTargetTables =
+          config.getTargetTables().stream()
+              .filter(
+                  targetTable ->
+                      targetTable.getFormatName().equals(TableFormat.HUDI)
+                          || 
targetTable.getFormatName().equals(TableFormat.DELTA))
+              .map(
+                  targetTable ->
+                      targetTable.toBuilder()
+                          .basePath(config.getSourceTable().getDataPath())
+                          .build())
+              .collect(Collectors.toList());
+      return new ConversionConfig(
+          config.getSourceTable(), updatedTargetTables, config.getSyncMode());
+    }
+    return config;
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 3d539766..b5ffcdf1 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -1013,8 +1013,7 @@ public class ITConversionController {
                     TargetTable.builder()
                         .name(tableName)
                         .formatName(formatName)
-                        // set the metadata path to the data path as the 
default (required by Hudi)
-                        .basePath(table.getDataPath())
+                        .basePath(table.getBasePath())
                         .metadataRetention(metadataRetention)
                         .build())
             .collect(Collectors.toList());
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
new file mode 100644
index 00000000..b1044039
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.sync.SyncMode;
+
+class TestConversionUtils {
+
+  @Test
+  void testNormalizeTargetPaths() {
+    ConversionConfig config =
+        ConversionConfig.builder()
+            .sourceTable(
+                SourceTable.builder()
+                    .name("table_name")
+                    .formatName(TableFormat.ICEBERG)
+                    .basePath("/tmp/basePath")
+                    .dataPath("/tmp/basePath/data")
+                    .build())
+            .syncMode(SyncMode.FULL)
+            .targetTables(
+                Arrays.asList(
+                    TargetTable.builder()
+                        .name("table_name")
+                        .basePath("/tmp/basePath")
+                        .formatName(TableFormat.DELTA)
+                        .build(),
+                    TargetTable.builder()
+                        .name("table_name")
+                        .basePath("/tmp/basePath")
+                        .formatName(TableFormat.HUDI)
+                        .build()))
+            .build();
+    ConversionConfig expectedNormalizedConfig =
+        ConversionConfig.builder()
+            .sourceTable(
+                SourceTable.builder()
+                    .name("table_name")
+                    .formatName(TableFormat.ICEBERG)
+                    .basePath("/tmp/basePath")
+                    .dataPath("/tmp/basePath/data")
+                    .build())
+            .syncMode(SyncMode.FULL)
+            .targetTables(
+                Arrays.asList(
+                    TargetTable.builder()
+                        .name("table_name")
+                        .basePath("/tmp/basePath/data")
+                        .formatName(TableFormat.DELTA)
+                        .build(),
+                    TargetTable.builder()
+                        .name("table_name")
+                        .basePath("/tmp/basePath/data")
+                        .formatName(TableFormat.HUDI)
+                        .build()))
+            .build();
+    ConversionConfig actualConfig = 
ConversionUtils.normalizeTargetPaths(config);
+    assertEquals(expectedNormalizedConfig, actualConfig);
+  }
+
+  @Test
+  void testNormalizeTargetPathsNoOp() {
+    ConversionConfig config =
+        ConversionConfig.builder()
+            .sourceTable(
+                SourceTable.builder()
+                    .name("table_name")
+                    .formatName(TableFormat.HUDI)
+                    .basePath("/tmp/basePath")
+                    .build())
+            .syncMode(SyncMode.FULL)
+            .targetTables(
+                Arrays.asList(
+                    TargetTable.builder()
+                        .name("table_name")
+                        .basePath("/tmp/basePath")
+                        .formatName(TableFormat.ICEBERG)
+                        .build(),
+                    TargetTable.builder()
+                        .name("table_name")
+                        .basePath("/tmp/basePath")
+                        .formatName(TableFormat.DELTA)
+                        .build()))
+            .build();
+    ConversionConfig actualConfig = 
ConversionUtils.normalizeTargetPaths(config);
+    assertEquals(config, actualConfig);
+  }
+}

Reply via email to