This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new f991e315 [bug] Parquet Source: snapshot sync fails on multiple commits 
with partitions (#806)
f991e315 is described below

commit f991e315407b68f31324503fec465a95632ffc3b
Author: Tim Brown <[email protected]>
AuthorDate: Mon Feb 23 11:40:07 2026 -0800

    [bug] Parquet Source: snapshot sync fails on multiple commits with 
partitions (#806)
    
    * make 2 commits, cleanup testing
    
    * fix imports
    
    * reduce diff
    
    * address comments
---
 .../xtable/parquet/ITParquetConversionSource.java  | 227 +++++++++------------
 1 file changed, 94 insertions(+), 133 deletions(-)

diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
index 530429ce..2ffa579a 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
@@ -26,7 +26,6 @@ import static 
org.apache.xtable.model.storage.TableFormat.PARQUET;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 
-import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Timestamp;
@@ -92,7 +91,6 @@ public class ITParquetConversionSource {
     sparkConf.set("parquet.avro.write-old-list-structure", "false");
     sparkConf.set("spark.sql.parquet.writeLegacyFormat", "false");
     sparkConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS");
-
     sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
     jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
   }
@@ -108,27 +106,19 @@ public class ITParquetConversionSource {
       sparkSession = null;
     }
   }
-  // delimiter must be / and not - or any other one
-  private static Stream<Arguments> provideArgsForFilePartitionTesting() {
-    String partitionConfig = // "timestamp:YEAR:year=yyyy";
-        "timestamp:MONTH:year=yyyy/month=MM"; // or 
"timestamp:YEAR:year=yyyy", or //
-    // timestamp:DAY:year=yyyy/month=MM/day=dd
-    return Stream.of(
-        Arguments.of(
-            buildArgsForPartition(
-                PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig, 
partitionConfig)));
-  }
 
   private static TableFormatPartitionDataHolder buildArgsForPartition(
       String sourceFormat,
       List<String> targetFormats,
       String hudiPartitionConfig,
-      String xTablePartitionConfig) {
+      String xTablePartitionConfig,
+      SyncMode syncMode) {
     return TableFormatPartitionDataHolder.builder()
         .sourceTableFormat(sourceFormat)
         .targetTableFormats(targetFormats)
         .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
         .xTablePartitionConfig(xTablePartitionConfig)
+        .syncMode(syncMode)
         .build();
   }
 
@@ -160,8 +150,7 @@ public class ITParquetConversionSource {
                     TargetTable.builder()
                         .name(tableName)
                         .formatName(formatName)
-                        // set the metadata path to the data path as the 
default (required by Hudi)
-                        .basePath(table.getDataPath())
+                        .basePath(table.getBasePath())
                         .metadataRetention(metadataRetention)
                         .build())
             .collect(Collectors.toList());
@@ -173,12 +162,21 @@ public class ITParquetConversionSource {
         .build();
   }
 
-  private static Stream<Arguments> provideArgsForFileNonPartitionTesting() {
-    String partitionConfig = null;
-    return Stream.of(
-        Arguments.of(
-            buildArgsForPartition(
-                PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig, 
partitionConfig)));
+  private static Stream<Arguments> provideArgsForSyncTesting() {
+    List<String> partitionConfigs = Arrays.asList(null, 
"timestamp:MONTH:year=yyyy/month=MM");
+    return partitionConfigs.stream()
+        .flatMap(
+            partitionConfig ->
+                Stream.of(SyncMode.FULL) // Incremental sync is not yet 
supported
+                    .map(
+                        syncMode ->
+                            Arguments.of(
+                                buildArgsForPartition(
+                                    PARQUET,
+                                    Arrays.asList(ICEBERG, DELTA, HUDI),
+                                    partitionConfig,
+                                    partitionConfig,
+                                    syncMode))));
   }
 
   private ConversionSourceProvider<?> getConversionSourceProvider(String 
sourceTableFormat) {
@@ -193,9 +191,8 @@ public class ITParquetConversionSource {
   }
 
   @ParameterizedTest
-  @MethodSource("provideArgsForFileNonPartitionTesting")
-  public void testFileNonPartitionedData(
-      TableFormatPartitionDataHolder tableFormatPartitionDataHolder) throws 
URISyntaxException {
+  @MethodSource("provideArgsForSyncTesting")
+  void testSync(TableFormatPartitionDataHolder tableFormatPartitionDataHolder) 
{
     String tableName = getTableName();
     String sourceTableFormat = 
tableFormatPartitionDataHolder.getSourceTableFormat();
     List<String> targetTableFormats = 
tableFormatPartitionDataHolder.getTargetTableFormats();
@@ -229,17 +226,24 @@ public class ITParquetConversionSource {
                   new MetadataBuilder().putString("precision", 
"millis").build())
             });
     Dataset<Row> df = sparkSession.createDataFrame(data, schema);
-    String dataPath = tempDir.toAbsolutePath().toString() + 
"/non_partitioned_data";
-    df.write().mode(SaveMode.Overwrite).parquet(dataPath);
-    GenericTable table;
-    table =
+    String dataPath =
+        tempDir
+            .resolve(
+                (xTablePartitionConfig == null ? "non_partitioned_data_" : 
"partitioned_data_")
+                    + tableFormatPartitionDataHolder.getSyncMode())
+            .toString();
+
+    writeData(df, dataPath, xTablePartitionConfig);
+    boolean isPartitioned = xTablePartitionConfig != null;
+
+    Path pathForXTable = Paths.get(dataPath);
+    try (GenericTable table =
         GenericTable.getInstance(
-            tableName, Paths.get(dataPath), sparkSession, jsc, 
sourceTableFormat, false);
-    try (GenericTable tableToClose = table) {
+            tableName, pathForXTable, sparkSession, jsc, sourceTableFormat, 
isPartitioned)) {
       ConversionConfig conversionConfig =
           getTableSyncConfig(
               sourceTableFormat,
-              SyncMode.FULL,
+              tableFormatPartitionDataHolder.getSyncMode(),
               tableName,
               table,
               targetTableFormats,
@@ -248,77 +252,12 @@ public class ITParquetConversionSource {
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
       conversionController.sync(conversionConfig, conversionSourceProvider);
-      checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose, 
targetTableFormats, false);
-    }
-  }
-
-  @ParameterizedTest
-  @MethodSource("provideArgsForFilePartitionTesting")
-  public void testFilePartitionedData(TableFormatPartitionDataHolder 
tableFormatPartitionDataHolder)
-      throws URISyntaxException {
-    String tableName = getTableName();
-    String sourceTableFormat = 
tableFormatPartitionDataHolder.getSourceTableFormat();
-    List<String> targetTableFormats = 
tableFormatPartitionDataHolder.getTargetTableFormats();
-    String xTablePartitionConfig = 
tableFormatPartitionDataHolder.getXTablePartitionConfig();
-    ConversionSourceProvider<?> conversionSourceProvider =
-        getConversionSourceProvider(sourceTableFormat);
-    // create the data
-    List<Row> data =
-        Arrays.asList(
-            RowFactory.create(1, "Alice", true, 30.1, new 
Timestamp(System.currentTimeMillis())),
-            RowFactory.create(
-                2, "Bob", false, 24.6, new 
Timestamp(System.currentTimeMillis() + 1000)),
-            RowFactory.create(
-                3, "Charlie", true, 35.2, new 
Timestamp(System.currentTimeMillis() + 2000)),
-            RowFactory.create(
-                4, "David", false, 29.5, new 
Timestamp(System.currentTimeMillis() + 3000)),
-            RowFactory.create(
-                5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis() 
+ 4000)));
+      checkDatasetEquivalenceWithFilter(
+          sourceTableFormat, table, targetTableFormats, isPartitioned);
 
-    schema =
-        DataTypes.createStructType(
-            new StructField[] {
-              DataTypes.createStructField("id", DataTypes.IntegerType, false),
-              DataTypes.createStructField("name", DataTypes.StringType, false),
-              DataTypes.createStructField("hasSiblings", 
DataTypes.BooleanType, false),
-              DataTypes.createStructField("age", DataTypes.DoubleType, false),
-              DataTypes.createStructField(
-                  "timestamp",
-                  DataTypes.TimestampType,
-                  false,
-                  new MetadataBuilder().putString("precision", 
"millis").build())
-            });
-    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
-    String dataPathPart = tempDir.toAbsolutePath() + "/partitioned_data";
-    df.withColumn("year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
-        .withColumn(
-            "month",
-            
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType), 
"MM"))
-        .write()
-        .mode(SaveMode.Overwrite)
-        .partitionBy("year", "month")
-        .parquet(dataPathPart);
-    GenericTable table;
-    table =
-        GenericTable.getInstance(
-            tableName, Paths.get(dataPathPart), sparkSession, jsc, 
sourceTableFormat, true);
-    try (GenericTable tableToClose = table) {
-      ConversionConfig conversionConfig =
-          getTableSyncConfig(
-              sourceTableFormat,
-              SyncMode.FULL,
-              tableName,
-              table,
-              targetTableFormats,
-              xTablePartitionConfig,
-              null);
-      ConversionController conversionController =
-          new ConversionController(jsc.hadoopConfiguration());
-      conversionController.sync(conversionConfig, conversionSourceProvider);
-      checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose, 
targetTableFormats, true);
       // update the current parquet file data with another attribute the sync 
again
       List<Row> dataToAppend =
-          Arrays.asList(
+          Collections.singletonList(
               RowFactory.create(
                   10,
                   "BobAppended",
@@ -327,34 +266,56 @@ public class ITParquetConversionSource {
                   new Timestamp(System.currentTimeMillis() + 1500)));
 
       Dataset<Row> dfAppend = sparkSession.createDataFrame(dataToAppend, 
schema);
-      dfAppend
-          .withColumn(
-              "year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
-          .withColumn(
-              "month",
-              
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType), 
"MM"))
-          .write()
-          .mode(SaveMode.Append)
-          .partitionBy("year", "month")
-          .parquet(dataPathPart);
-      GenericTable tableAppend;
-      tableAppend =
-          GenericTable.getInstance(
-              tableName, Paths.get(dataPathPart), sparkSession, jsc, 
sourceTableFormat, true);
-      try (GenericTable tableToCloseAppended = tableAppend) {
-        ConversionConfig conversionConfigAppended =
-            getTableSyncConfig(
-                sourceTableFormat,
-                SyncMode.FULL,
-                tableName,
-                tableAppend,
-                targetTableFormats,
-                xTablePartitionConfig,
-                null);
-        ConversionController conversionControllerAppended =
-            new ConversionController(jsc.hadoopConfiguration());
-        conversionControllerAppended.sync(conversionConfigAppended, 
conversionSourceProvider);
+      writeData(dfAppend, dataPath, xTablePartitionConfig);
+      ConversionConfig conversionConfigAppended =
+          getTableSyncConfig(
+              sourceTableFormat,
+              tableFormatPartitionDataHolder.getSyncMode(),
+              tableName,
+              table,
+              targetTableFormats,
+              xTablePartitionConfig,
+              null);
+      ConversionController conversionControllerAppended =
+          new ConversionController(jsc.hadoopConfiguration());
+      conversionControllerAppended.sync(conversionConfigAppended, 
conversionSourceProvider);
+      checkDatasetEquivalenceWithFilter(
+          sourceTableFormat, table, targetTableFormats, isPartitioned);
+    }
+  }
+
+  private void writeData(Dataset<Row> df, String dataPath, String 
partitionConfig) {
+    if (partitionConfig != null) {
+      // extract partition columns from config
+      String[] partitionCols =
+          Arrays.stream(partitionConfig.split(":")[2].split("/"))
+              .map(s -> s.split("=")[0])
+              .toArray(String[]::new);
+      // add partition columns to dataframe
+      for (String partitionCol : partitionCols) {
+        if (partitionCol.equals("year")) {
+          df =
+              df.withColumn(
+                  "year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)));
+        } else if (partitionCol.equals("month")) {
+          df =
+              df.withColumn(
+                  "month",
+                  functions.date_format(
+                      
functions.col("timestamp").cast(DataTypes.TimestampType), "MM"));
+        } else if (partitionCol.equals("day")) {
+          df =
+              df.withColumn(
+                  "day",
+                  functions.date_format(
+                      
functions.col("timestamp").cast(DataTypes.TimestampType), "dd"));
+        } else {
+          throw new IllegalArgumentException("Unsupported partition column: " 
+ partitionCol);
+        }
       }
+      
df.write().mode(SaveMode.Append).partitionBy(partitionCols).parquet(dataPath);
+    } else {
+      df.write().mode(SaveMode.Append).parquet(dataPath);
     }
   }
 
@@ -362,8 +323,7 @@ public class ITParquetConversionSource {
       String sourceFormat,
       GenericTable<?, ?> sourceTable,
       List<String> targetFormats,
-      boolean isPartitioned)
-      throws URISyntaxException {
+      boolean isPartitioned) {
     checkDatasetEquivalence(
         sourceFormat,
         sourceTable,
@@ -381,8 +341,7 @@ public class ITParquetConversionSource {
       List<String> targetFormats,
       Map<String, Map<String, String>> targetOptions,
       Integer expectedCount,
-      boolean isPartitioned)
-      throws URISyntaxException {
+      boolean isPartitioned) {
     Dataset<Row> sourceRows =
         sparkSession
             .read()
@@ -390,7 +349,8 @@ public class ITParquetConversionSource {
             .options(sourceOptions)
             .option("recursiveFileLookup", "true")
             .option("pathGlobFilter", "*.parquet")
-            .parquet(sourceTable.getDataPath());
+            .parquet(sourceTable.getDataPath())
+            .orderBy("id"); // order by id to ensure deterministic order for 
comparison
     Map<String, Dataset<Row>> targetRowsByFormat =
         targetFormats.stream()
             .collect(
@@ -409,7 +369,8 @@ public class ITParquetConversionSource {
                           .read()
                           .options(finalTargetOptions)
                           .format(targetFormat.toLowerCase())
-                          .load(sourceTable.getDataPath());
+                          .load(sourceTable.getDataPath())
+                          .orderBy("id");
                     }));
 
     String[] selectColumnsArr = schema.fieldNames();
@@ -422,7 +383,6 @@ public class ITParquetConversionSource {
       String format = entry.getKey();
 
       Dataset<Row> targetRows = entry.getValue();
-      targetRows.show();
 
       List<String> dataset2Rows = 
targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
 
@@ -466,5 +426,6 @@ public class ITParquetConversionSource {
     String xTablePartitionConfig;
     Optional<String> hudiSourceConfig;
     String filter;
+    SyncMode syncMode;
   }
 }

Reply via email to