This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new f991e315 [bug] Parquet Source: snapshot sync fails on multiple commits
with partitions (#806)
f991e315 is described below
commit f991e315407b68f31324503fec465a95632ffc3b
Author: Tim Brown <[email protected]>
AuthorDate: Mon Feb 23 11:40:07 2026 -0800
[bug] Parquet Source: snapshot sync fails on multiple commits with
partitions (#806)
* make 2 commits, cleanup testing
* fix imports
* reduce diff
* address comments
---
.../xtable/parquet/ITParquetConversionSource.java | 227 +++++++++------------
1 file changed, 94 insertions(+), 133 deletions(-)
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
index 530429ce..2ffa579a 100644
---
a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
@@ -26,7 +26,6 @@ import static
org.apache.xtable.model.storage.TableFormat.PARQUET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Timestamp;
@@ -92,7 +91,6 @@ public class ITParquetConversionSource {
sparkConf.set("parquet.avro.write-old-list-structure", "false");
sparkConf.set("spark.sql.parquet.writeLegacyFormat", "false");
sparkConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS");
-
sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
}
@@ -108,27 +106,19 @@ public class ITParquetConversionSource {
sparkSession = null;
}
}
- // delimiter must be / and not - or any other one
- private static Stream<Arguments> provideArgsForFilePartitionTesting() {
- String partitionConfig = // "timestamp:YEAR:year=yyyy";
- "timestamp:MONTH:year=yyyy/month=MM"; // or
"timestamp:YEAR:year=yyyy", or //
- // timestamp:DAY:year=yyyy/month=MM/day=dd
- return Stream.of(
- Arguments.of(
- buildArgsForPartition(
- PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig,
partitionConfig)));
- }
private static TableFormatPartitionDataHolder buildArgsForPartition(
String sourceFormat,
List<String> targetFormats,
String hudiPartitionConfig,
- String xTablePartitionConfig) {
+ String xTablePartitionConfig,
+ SyncMode syncMode) {
return TableFormatPartitionDataHolder.builder()
.sourceTableFormat(sourceFormat)
.targetTableFormats(targetFormats)
.hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
.xTablePartitionConfig(xTablePartitionConfig)
+ .syncMode(syncMode)
.build();
}
@@ -160,8 +150,7 @@ public class ITParquetConversionSource {
TargetTable.builder()
.name(tableName)
.formatName(formatName)
- // set the metadata path to the data path as the
default (required by Hudi)
- .basePath(table.getDataPath())
+ .basePath(table.getBasePath())
.metadataRetention(metadataRetention)
.build())
.collect(Collectors.toList());
@@ -173,12 +162,21 @@ public class ITParquetConversionSource {
.build();
}
- private static Stream<Arguments> provideArgsForFileNonPartitionTesting() {
- String partitionConfig = null;
- return Stream.of(
- Arguments.of(
- buildArgsForPartition(
- PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig,
partitionConfig)));
+ private static Stream<Arguments> provideArgsForSyncTesting() {
+ List<String> partitionConfigs = Arrays.asList(null,
"timestamp:MONTH:year=yyyy/month=MM");
+ return partitionConfigs.stream()
+ .flatMap(
+ partitionConfig ->
+ Stream.of(SyncMode.FULL) // Incremental sync is not yet
supported
+ .map(
+ syncMode ->
+ Arguments.of(
+ buildArgsForPartition(
+ PARQUET,
+ Arrays.asList(ICEBERG, DELTA, HUDI),
+ partitionConfig,
+ partitionConfig,
+ syncMode))));
}
private ConversionSourceProvider<?> getConversionSourceProvider(String
sourceTableFormat) {
@@ -193,9 +191,8 @@ public class ITParquetConversionSource {
}
@ParameterizedTest
- @MethodSource("provideArgsForFileNonPartitionTesting")
- public void testFileNonPartitionedData(
- TableFormatPartitionDataHolder tableFormatPartitionDataHolder) throws
URISyntaxException {
+ @MethodSource("provideArgsForSyncTesting")
+ void testSync(TableFormatPartitionDataHolder tableFormatPartitionDataHolder)
{
String tableName = getTableName();
String sourceTableFormat =
tableFormatPartitionDataHolder.getSourceTableFormat();
List<String> targetTableFormats =
tableFormatPartitionDataHolder.getTargetTableFormats();
@@ -229,17 +226,24 @@ public class ITParquetConversionSource {
new MetadataBuilder().putString("precision",
"millis").build())
});
Dataset<Row> df = sparkSession.createDataFrame(data, schema);
- String dataPath = tempDir.toAbsolutePath().toString() +
"/non_partitioned_data";
- df.write().mode(SaveMode.Overwrite).parquet(dataPath);
- GenericTable table;
- table =
+ String dataPath =
+ tempDir
+ .resolve(
+ (xTablePartitionConfig == null ? "non_partitioned_data_" :
"partitioned_data_")
+ + tableFormatPartitionDataHolder.getSyncMode())
+ .toString();
+
+ writeData(df, dataPath, xTablePartitionConfig);
+ boolean isPartitioned = xTablePartitionConfig != null;
+
+ Path pathForXTable = Paths.get(dataPath);
+ try (GenericTable table =
GenericTable.getInstance(
- tableName, Paths.get(dataPath), sparkSession, jsc,
sourceTableFormat, false);
- try (GenericTable tableToClose = table) {
+ tableName, pathForXTable, sparkSession, jsc, sourceTableFormat,
isPartitioned)) {
ConversionConfig conversionConfig =
getTableSyncConfig(
sourceTableFormat,
- SyncMode.FULL,
+ tableFormatPartitionDataHolder.getSyncMode(),
tableName,
table,
targetTableFormats,
@@ -248,77 +252,12 @@ public class ITParquetConversionSource {
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
conversionController.sync(conversionConfig, conversionSourceProvider);
- checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose,
targetTableFormats, false);
- }
- }
-
- @ParameterizedTest
- @MethodSource("provideArgsForFilePartitionTesting")
- public void testFilePartitionedData(TableFormatPartitionDataHolder
tableFormatPartitionDataHolder)
- throws URISyntaxException {
- String tableName = getTableName();
- String sourceTableFormat =
tableFormatPartitionDataHolder.getSourceTableFormat();
- List<String> targetTableFormats =
tableFormatPartitionDataHolder.getTargetTableFormats();
- String xTablePartitionConfig =
tableFormatPartitionDataHolder.getXTablePartitionConfig();
- ConversionSourceProvider<?> conversionSourceProvider =
- getConversionSourceProvider(sourceTableFormat);
- // create the data
- List<Row> data =
- Arrays.asList(
- RowFactory.create(1, "Alice", true, 30.1, new
Timestamp(System.currentTimeMillis())),
- RowFactory.create(
- 2, "Bob", false, 24.6, new
Timestamp(System.currentTimeMillis() + 1000)),
- RowFactory.create(
- 3, "Charlie", true, 35.2, new
Timestamp(System.currentTimeMillis() + 2000)),
- RowFactory.create(
- 4, "David", false, 29.5, new
Timestamp(System.currentTimeMillis() + 3000)),
- RowFactory.create(
- 5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis()
+ 4000)));
+ checkDatasetEquivalenceWithFilter(
+ sourceTableFormat, table, targetTableFormats, isPartitioned);
- schema =
- DataTypes.createStructType(
- new StructField[] {
- DataTypes.createStructField("id", DataTypes.IntegerType, false),
- DataTypes.createStructField("name", DataTypes.StringType, false),
- DataTypes.createStructField("hasSiblings",
DataTypes.BooleanType, false),
- DataTypes.createStructField("age", DataTypes.DoubleType, false),
- DataTypes.createStructField(
- "timestamp",
- DataTypes.TimestampType,
- false,
- new MetadataBuilder().putString("precision",
"millis").build())
- });
- Dataset<Row> df = sparkSession.createDataFrame(data, schema);
- String dataPathPart = tempDir.toAbsolutePath() + "/partitioned_data";
- df.withColumn("year",
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
- .withColumn(
- "month",
-
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType),
"MM"))
- .write()
- .mode(SaveMode.Overwrite)
- .partitionBy("year", "month")
- .parquet(dataPathPart);
- GenericTable table;
- table =
- GenericTable.getInstance(
- tableName, Paths.get(dataPathPart), sparkSession, jsc,
sourceTableFormat, true);
- try (GenericTable tableToClose = table) {
- ConversionConfig conversionConfig =
- getTableSyncConfig(
- sourceTableFormat,
- SyncMode.FULL,
- tableName,
- table,
- targetTableFormats,
- xTablePartitionConfig,
- null);
- ConversionController conversionController =
- new ConversionController(jsc.hadoopConfiguration());
- conversionController.sync(conversionConfig, conversionSourceProvider);
- checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose,
targetTableFormats, true);
// update the current parquet file data with another attribute the sync
again
List<Row> dataToAppend =
- Arrays.asList(
+ Collections.singletonList(
RowFactory.create(
10,
"BobAppended",
@@ -327,34 +266,56 @@ public class ITParquetConversionSource {
new Timestamp(System.currentTimeMillis() + 1500)));
Dataset<Row> dfAppend = sparkSession.createDataFrame(dataToAppend,
schema);
- dfAppend
- .withColumn(
- "year",
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
- .withColumn(
- "month",
-
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType),
"MM"))
- .write()
- .mode(SaveMode.Append)
- .partitionBy("year", "month")
- .parquet(dataPathPart);
- GenericTable tableAppend;
- tableAppend =
- GenericTable.getInstance(
- tableName, Paths.get(dataPathPart), sparkSession, jsc,
sourceTableFormat, true);
- try (GenericTable tableToCloseAppended = tableAppend) {
- ConversionConfig conversionConfigAppended =
- getTableSyncConfig(
- sourceTableFormat,
- SyncMode.FULL,
- tableName,
- tableAppend,
- targetTableFormats,
- xTablePartitionConfig,
- null);
- ConversionController conversionControllerAppended =
- new ConversionController(jsc.hadoopConfiguration());
- conversionControllerAppended.sync(conversionConfigAppended,
conversionSourceProvider);
+ writeData(dfAppend, dataPath, xTablePartitionConfig);
+ ConversionConfig conversionConfigAppended =
+ getTableSyncConfig(
+ sourceTableFormat,
+ tableFormatPartitionDataHolder.getSyncMode(),
+ tableName,
+ table,
+ targetTableFormats,
+ xTablePartitionConfig,
+ null);
+ ConversionController conversionControllerAppended =
+ new ConversionController(jsc.hadoopConfiguration());
+ conversionControllerAppended.sync(conversionConfigAppended,
conversionSourceProvider);
+ checkDatasetEquivalenceWithFilter(
+ sourceTableFormat, table, targetTableFormats, isPartitioned);
+ }
+ }
+
+ private void writeData(Dataset<Row> df, String dataPath, String
partitionConfig) {
+ if (partitionConfig != null) {
+ // extract partition columns from config
+ String[] partitionCols =
+ Arrays.stream(partitionConfig.split(":")[2].split("/"))
+ .map(s -> s.split("=")[0])
+ .toArray(String[]::new);
+ // add partition columns to dataframe
+ for (String partitionCol : partitionCols) {
+ if (partitionCol.equals("year")) {
+ df =
+ df.withColumn(
+ "year",
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)));
+ } else if (partitionCol.equals("month")) {
+ df =
+ df.withColumn(
+ "month",
+ functions.date_format(
+
functions.col("timestamp").cast(DataTypes.TimestampType), "MM"));
+ } else if (partitionCol.equals("day")) {
+ df =
+ df.withColumn(
+ "day",
+ functions.date_format(
+
functions.col("timestamp").cast(DataTypes.TimestampType), "dd"));
+ } else {
+ throw new IllegalArgumentException("Unsupported partition column: "
+ partitionCol);
+ }
}
+
df.write().mode(SaveMode.Append).partitionBy(partitionCols).parquet(dataPath);
+ } else {
+ df.write().mode(SaveMode.Append).parquet(dataPath);
}
}
@@ -362,8 +323,7 @@ public class ITParquetConversionSource {
String sourceFormat,
GenericTable<?, ?> sourceTable,
List<String> targetFormats,
- boolean isPartitioned)
- throws URISyntaxException {
+ boolean isPartitioned) {
checkDatasetEquivalence(
sourceFormat,
sourceTable,
@@ -381,8 +341,7 @@ public class ITParquetConversionSource {
List<String> targetFormats,
Map<String, Map<String, String>> targetOptions,
Integer expectedCount,
- boolean isPartitioned)
- throws URISyntaxException {
+ boolean isPartitioned) {
Dataset<Row> sourceRows =
sparkSession
.read()
@@ -390,7 +349,8 @@ public class ITParquetConversionSource {
.options(sourceOptions)
.option("recursiveFileLookup", "true")
.option("pathGlobFilter", "*.parquet")
- .parquet(sourceTable.getDataPath());
+ .parquet(sourceTable.getDataPath())
+ .orderBy("id"); // order by id to ensure deterministic order for
comparison
Map<String, Dataset<Row>> targetRowsByFormat =
targetFormats.stream()
.collect(
@@ -409,7 +369,8 @@ public class ITParquetConversionSource {
.read()
.options(finalTargetOptions)
.format(targetFormat.toLowerCase())
- .load(sourceTable.getDataPath());
+ .load(sourceTable.getDataPath())
+ .orderBy("id");
}));
String[] selectColumnsArr = schema.fieldNames();
@@ -422,7 +383,6 @@ public class ITParquetConversionSource {
String format = entry.getKey();
Dataset<Row> targetRows = entry.getValue();
- targetRows.show();
List<String> dataset2Rows =
targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
@@ -466,5 +426,6 @@ public class ITParquetConversionSource {
String xTablePartitionConfig;
Optional<String> hudiSourceConfig;
String filter;
+ SyncMode syncMode;
}
}