the-other-tim-brown commented on code in PR #728: URL: https://github.com/apache/incubator-xtable/pull/728#discussion_r2330207102
########## pom.xml: ########## @@ -655,7 +657,7 @@ </configuration> </execution> </executions> - </plugin> + </plugin><!-- Review Comment: If this plugin definition is no longer required, it can be removed. ########## pom.xml: ########## @@ -712,6 +730,16 @@ </execution> </executions> <configuration> + <argLine> Review Comment: Is this still required? ########## xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java: ########## @@ -252,7 +252,7 @@ private Map<String, HoodieColumnRangeMetadata<Comparable>> convertColStats( columnStat.getNumValues(), columnStat.getTotalSize(), -1L)) - .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity())); + .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, metadata -> metadata)); Review Comment: These are functionally the same, why is this change required? ########## xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java: ########## @@ -43,17 +43,35 @@ public static <T> T createInstanceOfClass(String className, Object... constructo if (constructorArgs.length == 0) { return clazz.newInstance(); } - Class<?>[] constructorArgTypes = + /*Class<?>[] constructorArgTypes = Review Comment: You can remove this commented out code ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -295,15 +299,30 @@ public InternalSchema toInternalSchema(Type schema, String parentPath) { .fieldId(fieldId == null ? null : fieldId.intValue()) .build()); } - if (currentRepetition != Repetition.REPEATED - && schema.asGroupType().getName() != "list" + // RECORD Type (non-nullable elements) + if (schema.asGroupType().getName() != "list" && !Arrays.asList("key_value", "map").contains(schema.asGroupType().getName())) { + boolean isNullable = + subFields.stream() + .filter(ele -> ele.getSchema().isNullable()) + .collect(Collectors.toList()) + .size() + == 0 Review Comment: You can use Stream's `noneMatch` to simplify this check ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java: ########## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.NonNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import org.apache.xtable.hudi.*; +import org.apache.xtable.hudi.HudiPathUtils; +import org.apache.xtable.model.*; +import org.apache.xtable.model.CommitsBacklog; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Builder +public class ParquetConversionSource implements ConversionSource<Long> { + + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private static final ParquetStatsExtractor parquetStatsExtractor = + ParquetStatsExtractor.getInstance(); + + private final ParquetPartitionValueExtractor partitionValueExtractor; + private final PathBasedPartitionSpecExtractor partitionSpecExtractor; + private final String tableName; + private final String basePath; + @NonNull private final Configuration hadoopConf; + + private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) { + ParquetMetadata parquetMetadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath()); + MessageType parquetSchema = parquetMetadataExtractor.getSchema(parquetMetadata); + InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, ""); + List<InternalPartitionField> partitionFields = partitionSpecExtractor.spec(schema); + + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime())) + .build(); + } + + @Override + public InternalTable getTable(Long modificationTime) { + // get parquetFile at specific time modificationTime + Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath); + LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime); + return createInternalTableFromFile(file); + } + + private Stream<InternalDataFile> getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) { + return parquetFiles.map( + file -> + InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues( + partitionValueExtractor.extractPartitionValues( + partitionSpecExtractor.spec( + partitionValueExtractor.extractSchemaForParquetPartitions( + parquetMetadataExtractor.readParquetMetadata( + hadoopConf, file.getPath()), + file.getPath().toString())), + HudiPathUtils.getPartitionPath( + new Path(basePath), new Path(file.getPath().toString())))) + .lastModified(file.getModificationTime()) + .columnStats( + parquetStatsExtractor.getColumnStatsForaFile( + parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()))) + .build()); + } + + private InternalDataFile createInternalDataFileFromParquetFile(FileStatus parquetFile) { + return InternalDataFile.builder() + .physicalPath(parquetFile.getPath().toString()) + .partitionValues( + partitionValueExtractor.extractPartitionValues( + partitionSpecExtractor.spec( + partitionValueExtractor.extractSchemaForParquetPartitions( + parquetMetadataExtractor.readParquetMetadata( + hadoopConf, parquetFile.getPath()), + parquetFile.getPath().toString())), + basePath)) + .lastModified(parquetFile.getModificationTime()) + .fileSizeBytes(parquetFile.getLen()) + .columnStats( + parquetStatsExtractor.getColumnStatsForaFile( + parquetMetadataExtractor.readParquetMetadata(hadoopConf, parquetFile.getPath()))) + .build(); + } + + @Override + public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync syncInstants) { + List<Long> commitsToProcess = + Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli()); + return CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build(); + } + + @Override + public TableChange getTableChangeForCommit(Long modificationTime) { + Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath); + Set<InternalDataFile> addedInternalDataFiles = new HashSet<>(); + + List<FileStatus> tableChangesAfter = + parquetFiles + .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) + .collect(Collectors.toList()); + InternalTable internalTable = getMostRecentTable(parquetFiles); + for (FileStatus tableStatus : tableChangesAfter) { + InternalDataFile currentDataFile = createInternalDataFileFromParquetFile(tableStatus); + addedInternalDataFiles.add(currentDataFile); + } + + return TableChange.builder() + .tableAsOfChange(internalTable) + .filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build()) + .build(); + } + + private InternalTable getMostRecentTable(Stream<LocatedFileStatus> parquetFiles) { + LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles); + return createInternalTableFromFile(latestFile); + } + + @Override + public InternalTable getCurrentTable() { + Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath); + return getMostRecentTable(parquetFiles); + } + + /** + * get current snapshot + * + * @return + */ + @Override + public InternalSnapshot getCurrentSnapshot() { + // to avoid consume the stream call the method twice to return the same stream of parquet files + Stream<InternalDataFile> internalDataFiles = + getInternalDataFiles(getParquetFiles(hadoopConf, basePath)); + InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, basePath)); + return InternalSnapshot.builder() + .table(table) + .sourceIdentifier( + getCommitIdentifier( + getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath)) + .getModificationTime())) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + .build(); + } + + private LocatedFileStatus getMostRecentParquetFile(Stream<LocatedFileStatus> parquetFiles) { + return parquetFiles + .max(Comparator.comparing(FileStatus::getModificationTime)) + .orElseThrow(() -> new IllegalStateException("No files found")); + } + + private LocatedFileStatus getParquetFileAt( + Stream<LocatedFileStatus> parquetFiles, long modificationTime) { + return parquetFiles + .filter(fileStatus -> fileStatus.getModificationTime() == modificationTime) + .findFirst() + .orElseThrow( + () -> new IllegalStateException("No file found at " + Long.valueOf(modificationTime))); + } + + private Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, String basePath) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + URI uriBasePath = new URI(basePath); + String parentPath = Paths.get(uriBasePath).toString(); + RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(parentPath), true); + return RemoteIterators.toList(iterator).stream() + .filter(file -> file.getPath().getName().toString().endsWith("parquet")); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + long modficationTime = instant.getEpochSecond(); + Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath); + LocatedFileStatus parquetFile = getMostRecentParquetFile(parquetFiles); + Path parquetFilePath = parquetFile.getPath(); + while (parquetFile.isFile() && parquetFile.getModificationTime() > modficationTime) { + // check the preceeding parquetFile + parquetFiles.filter(file -> !file.getPath().equals(parquetFilePath)); Review Comment: The output of this filter is not being used, what is the intention of this code? ########## xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java: ########## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.apache.xtable.GenericTable.getTableName; +import static org.apache.xtable.model.storage.TableFormat.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.Value; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.conversion.*; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiTestUtil; +import org.apache.xtable.model.sync.SyncMode; + +public class ITParquetConversionSource { + private static final DateTimeFormatter DATE_FORMAT = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String PARTITION_FIELD_SPEC_CONFIG = + "xtable.parquet.source.partition_field_spec_config"; + @TempDir public static Path tempDir; + private static JavaSparkContext jsc; + private static SparkSession sparkSession; + private static StructType schema; + + @BeforeAll + public static void setupOnce() { + SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir); + + String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"; + sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions); + sparkConf = HoodieReadClient.addHoodieSupport(sparkConf); + sparkConf.set("parquet.avro.write-old-list-structure", "false"); + + String javaOpts = + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "--add-opens=java.base/java.io=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"; + + sparkConf.set("spark.driver.extraJavaOptions", javaOpts); + sparkConf.set("spark.executor.extraJavaOptions", javaOpts); + + sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); + + List<Row> data = + Arrays.asList( + RowFactory.create(1, "Alice", 30, new Timestamp(System.currentTimeMillis())), + RowFactory.create(2, "Bob", 24, new Timestamp(System.currentTimeMillis() + 1000)), + RowFactory.create(3, "Charlie", 35, new Timestamp(System.currentTimeMillis() + 2000)), + RowFactory.create(4, "David", 29, new Timestamp(System.currentTimeMillis() + 3000)), + RowFactory.create(5, "Eve", 22, new Timestamp(System.currentTimeMillis() + 4000))); + + schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("age", DataTypes.IntegerType, false), + DataTypes.createStructField( + "timestamp", + DataTypes.TimestampType, + false, + new MetadataBuilder().putString("precision", "millis").build()) + }); + + Dataset<Row> df = sparkSession.createDataFrame(data, schema); + df.withColumn("year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))) + .write() + .mode(SaveMode.Overwrite) + .partitionBy("year") + .parquet(tempDir.toAbsolutePath().toString()); + + // test if data was written correctly + Dataset<Row> reloadedDf = sparkSession.read().parquet(tempDir.toAbsolutePath().toString()); + reloadedDf.show(); + reloadedDf.printSchema(); + } + + @AfterAll + public static void teardown() { + if (jsc != null) { + jsc.stop(); + jsc = null; + } + if (sparkSession != null) { + sparkSession.stop(); + sparkSession = null; + } + } + + private static Stream<Arguments> provideArgsForFilePartitionTesting() { + String timestampFilter = + String.format( + "timestamp_micros_nullable_field < timestamp_millis(%s)", + Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, ChronoUnit.DAYS).toEpochMilli()); + String levelFilter = "level = 'INFO'"; + String nestedLevelFilter = "nested_record.level = 'INFO'"; + String severityFilter = "severity = 1"; + String timestampAndLevelFilter = String.format("%s and %s", timestampFilter, levelFilter); + return Stream.of( + Arguments.of( + buildArgsForPartition( + PARQUET, + Arrays.asList(ICEBERG, DELTA, HUDI), + "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD Review Comment: Is the plan to add these other cases? ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java: ########## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.NonNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import org.apache.xtable.hudi.*; +import org.apache.xtable.hudi.HudiPathUtils; +import org.apache.xtable.model.*; +import org.apache.xtable.model.CommitsBacklog; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Builder +public class ParquetConversionSource implements ConversionSource<Long> { + + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private static final ParquetStatsExtractor parquetStatsExtractor = + ParquetStatsExtractor.getInstance(); + + private final ParquetPartitionValueExtractor partitionValueExtractor; + private final PathBasedPartitionSpecExtractor partitionSpecExtractor; + private final String tableName; + private final String basePath; + @NonNull private final Configuration hadoopConf; + + private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) { + ParquetMetadata parquetMetadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath()); + MessageType parquetSchema = parquetMetadataExtractor.getSchema(parquetMetadata); + InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, ""); + List<InternalPartitionField> partitionFields = partitionSpecExtractor.spec(schema); + + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime())) + .build(); + } + + @Override + public InternalTable getTable(Long modificationTime) { + // get parquetFile at specific time modificationTime + Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, basePath); + LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime); + return createInternalTableFromFile(file); + } + + private Stream<InternalDataFile> getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) { + return parquetFiles.map( + file -> + InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues( + partitionValueExtractor.extractPartitionValues( + partitionSpecExtractor.spec( + partitionValueExtractor.extractSchemaForParquetPartitions( + parquetMetadataExtractor.readParquetMetadata( + hadoopConf, file.getPath()), + file.getPath().toString())), + HudiPathUtils.getPartitionPath( + new Path(basePath), new Path(file.getPath().toString())))) Review Comment: ```suggestion new Path(basePath), file.getPath()))) ``` ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -295,15 +299,30 @@ public InternalSchema toInternalSchema(Type schema, String parentPath) { .fieldId(fieldId == null ? null : fieldId.intValue()) .build()); } - if (currentRepetition != Repetition.REPEATED - && schema.asGroupType().getName() != "list" + // RECORD Type (non-nullable elements) + if (schema.asGroupType().getName() != "list" Review Comment: String equality checks in java need to use `.equals` ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java: ########## @@ -132,19 +181,23 @@ public static InternalDataFile toInternalDataFile(Configuration hadoopConf, Path try { FileSystem fs = FileSystem.get(hadoopConf); file = fs.getFileStatus(parentPath); - // InputPartitionFields partitionInfo = initPartitionInfo(); footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath); MessageType schema = parquetMetadataExtractor.getSchema(footer); columnStatsForAFile = getColumnStatsForaFile(footer); - // partitionValues = partitionExtractor.createPartitionValues( - // partitionInfo); + partitionValues = + partitionValueExtractor.extractPartitionValues( + partitionSpecExtractor.spec( + partitionValueExtractor.extractSchemaForParquetPartitions( + parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()), + file.getPath().toString())), + parentPath.toString()); } catch (java.io.IOException e) { Review Comment: @unical1988 can you address this? ########## pom.xml: ########## @@ -674,6 +676,21 @@ </goals> </execution> </executions> + </plugin>--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> Review Comment: This is already declared on line 717, can you remove that definition? ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.nio.charset.StandardCharsets; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +public class ParquetStatsConverterUtil { + public static Object convertStatBinaryTypeToLogicalType( + ColumnChunkMetaData columnMetaData, boolean isMin) { + Object returnedObj = null; + PrimitiveType primitiveType = columnMetaData.getPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + case BINARY: // TODO check if other primitiveType' needs to be handled as well Review Comment: Can you create a new GitHub issue and reference that as part of the TODO? It will help in tracking the future work. ########## xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java: ########## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.apache.xtable.GenericTable.getTableName; +import static org.apache.xtable.model.storage.TableFormat.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.Value; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.conversion.*; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiTestUtil; +import org.apache.xtable.model.sync.SyncMode; + +public class ITParquetConversionSource { + private static final DateTimeFormatter DATE_FORMAT = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String PARTITION_FIELD_SPEC_CONFIG = + "xtable.parquet.source.partition_field_spec_config"; + @TempDir public static Path tempDir; + private static JavaSparkContext jsc; + private static SparkSession sparkSession; + private static StructType schema; + + @BeforeAll + public static void setupOnce() { + SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir); + + String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"; + sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions); + sparkConf = HoodieReadClient.addHoodieSupport(sparkConf); + sparkConf.set("parquet.avro.write-old-list-structure", "false"); + + String javaOpts = + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "--add-opens=java.base/java.io=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"; + + sparkConf.set("spark.driver.extraJavaOptions", javaOpts); + sparkConf.set("spark.executor.extraJavaOptions", javaOpts); + + sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); + + List<Row> data = + Arrays.asList( + RowFactory.create(1, "Alice", 30, new Timestamp(System.currentTimeMillis())), + RowFactory.create(2, "Bob", 24, new Timestamp(System.currentTimeMillis() + 1000)), + RowFactory.create(3, "Charlie", 35, new Timestamp(System.currentTimeMillis() + 2000)), + RowFactory.create(4, "David", 29, new Timestamp(System.currentTimeMillis() + 3000)), + RowFactory.create(5, "Eve", 22, new Timestamp(System.currentTimeMillis() + 4000))); + + schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("age", DataTypes.IntegerType, false), + DataTypes.createStructField( + "timestamp", + DataTypes.TimestampType, + false, + new MetadataBuilder().putString("precision", "millis").build()) + }); + + Dataset<Row> df = sparkSession.createDataFrame(data, schema); + df.withColumn("year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))) + .write() + .mode(SaveMode.Overwrite) + .partitionBy("year") + .parquet(tempDir.toAbsolutePath().toString()); + + // test if data was written correctly + Dataset<Row> reloadedDf = sparkSession.read().parquet(tempDir.toAbsolutePath().toString()); + reloadedDf.show(); + reloadedDf.printSchema(); + } + + @AfterAll + public static void teardown() { + if (jsc != null) { + jsc.stop(); + jsc = null; + } + if (sparkSession != null) { + sparkSession.stop(); + sparkSession = null; + } + } + + private static Stream<Arguments> provideArgsForFilePartitionTesting() { + String timestampFilter = + String.format( + "timestamp_micros_nullable_field < timestamp_millis(%s)", + Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, ChronoUnit.DAYS).toEpochMilli()); + String levelFilter = "level = 'INFO'"; + String nestedLevelFilter = "nested_record.level = 'INFO'"; + String severityFilter = "severity = 1"; + String timestampAndLevelFilter = String.format("%s and %s", timestampFilter, levelFilter); + return Stream.of( + Arguments.of( + buildArgsForPartition( + PARQUET, + Arrays.asList(ICEBERG, DELTA, HUDI), + "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD + // "year=YYYY/month=MM/day=DD" + "timestamp:YEAR:year=YYYY", + levelFilter))); + } + + private static TableFormatPartitionDataHolder buildArgsForPartition( + String sourceFormat, + List<String> targetFormats, + String hudiPartitionConfig, + String xTablePartitionConfig, + String filter) { + return TableFormatPartitionDataHolder.builder() + .sourceTableFormat(sourceFormat) + .targetTableFormats(targetFormats) + .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig)) + .xTablePartitionConfig(xTablePartitionConfig) + .filter(filter) + .build(); + } + + private static ConversionConfig getTableSyncConfig( + String sourceTableFormat, + SyncMode syncMode, + String tableName, + GenericTable table, + List<String> targetTableFormats, + String partitionConfig, + Duration metadataRetention) { + Properties sourceProperties = new Properties(); + if (partitionConfig != null) { + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + } + SourceTable sourceTable = + SourceTable.builder() + .name(tableName) + .formatName(sourceTableFormat) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) + .build(); + + List<TargetTable> targetTables = + targetTableFormats.stream() + .map( + formatName -> + TargetTable.builder() + .name(tableName) + .formatName(formatName) + // set the metadata path to the data path as the default (required by Hudi) + .basePath(table.getDataPath()) + .metadataRetention(metadataRetention) + .build()) + .collect(Collectors.toList()); + + return ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(syncMode) + .build(); + } + + private ConversionSourceProvider<?> getConversionSourceProvider(String sourceTableFormat) { + if (sourceTableFormat.equalsIgnoreCase(PARQUET)) { + ConversionSourceProvider<Long> parquetConversionSourceProvider = + new ParquetConversionSourceProvider(); + parquetConversionSourceProvider.init(jsc.hadoopConfiguration()); + return parquetConversionSourceProvider; + } else { + throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); + } + } + + @ParameterizedTest + @MethodSource("provideArgsForFilePartitionTesting") + public void testFilePartitionedData( Review Comment: Can you add a case where the data is not partitioned as well? ########## xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java: ########## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.apache.xtable.GenericTable.getTableName; +import static org.apache.xtable.model.storage.TableFormat.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.Value; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.conversion.*; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiTestUtil; +import org.apache.xtable.model.sync.SyncMode; + +public class ITParquetConversionSource { + private static final DateTimeFormatter DATE_FORMAT = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String PARTITION_FIELD_SPEC_CONFIG = + "xtable.parquet.source.partition_field_spec_config"; + @TempDir public static Path tempDir; + private static JavaSparkContext jsc; + private static SparkSession sparkSession; + private static StructType schema; + + @BeforeAll + public static void setupOnce() { + SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir); + + String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"; + sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions); + sparkConf = HoodieReadClient.addHoodieSupport(sparkConf); + sparkConf.set("parquet.avro.write-old-list-structure", "false"); + + String javaOpts = + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "--add-opens=java.base/java.io=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"; + + sparkConf.set("spark.driver.extraJavaOptions", javaOpts); + sparkConf.set("spark.executor.extraJavaOptions", javaOpts); + + sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); + + List<Row> data = + Arrays.asList( + RowFactory.create(1, "Alice", 30, new Timestamp(System.currentTimeMillis())), + RowFactory.create(2, "Bob", 24, new Timestamp(System.currentTimeMillis() + 1000)), + RowFactory.create(3, "Charlie", 35, new Timestamp(System.currentTimeMillis() + 2000)), + RowFactory.create(4, "David", 29, new Timestamp(System.currentTimeMillis() + 3000)), + RowFactory.create(5, "Eve", 22, new Timestamp(System.currentTimeMillis() + 4000))); + + schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("age", DataTypes.IntegerType, false), + DataTypes.createStructField( + "timestamp", + DataTypes.TimestampType, + false, + new MetadataBuilder().putString("precision", "millis").build()) + }); + + Dataset<Row> df = sparkSession.createDataFrame(data, schema); + df.withColumn("year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))) + .write() + .mode(SaveMode.Overwrite) + .partitionBy("year") + .parquet(tempDir.toAbsolutePath().toString()); + + // test if data was written correctly + Dataset<Row> reloadedDf = sparkSession.read().parquet(tempDir.toAbsolutePath().toString()); + reloadedDf.show(); + reloadedDf.printSchema(); + } + + @AfterAll + public static void teardown() { + if (jsc != null) { + jsc.stop(); + jsc = null; + } + if (sparkSession != null) { + sparkSession.stop(); + sparkSession = null; + } + } + + private static Stream<Arguments> provideArgsForFilePartitionTesting() { + String timestampFilter = + String.format( + "timestamp_micros_nullable_field < timestamp_millis(%s)", + Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, ChronoUnit.DAYS).toEpochMilli()); + String levelFilter = "level = 'INFO'"; + String nestedLevelFilter = "nested_record.level = 'INFO'"; + String severityFilter = "severity = 1"; + String timestampAndLevelFilter = String.format("%s and %s", timestampFilter, levelFilter); + return Stream.of( + Arguments.of( + buildArgsForPartition( + PARQUET, + Arrays.asList(ICEBERG, DELTA, HUDI), + "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD + // "year=YYYY/month=MM/day=DD" + "timestamp:YEAR:year=YYYY", + levelFilter))); + } + + private static TableFormatPartitionDataHolder buildArgsForPartition( + String sourceFormat, + List<String> targetFormats, + String hudiPartitionConfig, + String xTablePartitionConfig, + String filter) { + return TableFormatPartitionDataHolder.builder() + .sourceTableFormat(sourceFormat) + .targetTableFormats(targetFormats) + .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig)) + .xTablePartitionConfig(xTablePartitionConfig) + .filter(filter) + .build(); + } + + private static ConversionConfig getTableSyncConfig( + String sourceTableFormat, + SyncMode syncMode, + String tableName, + GenericTable table, + List<String> targetTableFormats, + String partitionConfig, + Duration metadataRetention) { + Properties sourceProperties = new Properties(); + if (partitionConfig != null) { + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + } + SourceTable sourceTable = + SourceTable.builder() + .name(tableName) + .formatName(sourceTableFormat) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) + .build(); + + List<TargetTable> targetTables = + targetTableFormats.stream() + .map( + formatName -> + TargetTable.builder() + .name(tableName) + .formatName(formatName) + // set the metadata path to the data path as the default (required by Hudi) + .basePath(table.getDataPath()) + .metadataRetention(metadataRetention) + .build()) + .collect(Collectors.toList()); + + return ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(syncMode) + .build(); + } + + private ConversionSourceProvider<?> getConversionSourceProvider(String sourceTableFormat) { + if (sourceTableFormat.equalsIgnoreCase(PARQUET)) { + ConversionSourceProvider<Long> parquetConversionSourceProvider = + new ParquetConversionSourceProvider(); + parquetConversionSourceProvider.init(jsc.hadoopConfiguration()); + return parquetConversionSourceProvider; + } else { + throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); + } + } + + @ParameterizedTest + @MethodSource("provideArgsForFilePartitionTesting") + public void testFilePartitionedData( + TableFormatPartitionDataHolder tableFormatPartitionDataHolder) { + String tableName = getTableName(); + String sourceTableFormat = tableFormatPartitionDataHolder.getSourceTableFormat(); + List<String> targetTableFormats = tableFormatPartitionDataHolder.getTargetTableFormats(); + // Optional<String> hudiPartitionConfig = tableFormatPartitionDataHolder.getHudiSourceConfig(); + String xTablePartitionConfig = tableFormatPartitionDataHolder.getXTablePartitionConfig(); + String filter = tableFormatPartitionDataHolder.getFilter(); + ConversionSourceProvider<?> conversionSourceProvider = + getConversionSourceProvider(sourceTableFormat); + GenericTable table; + table = + GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, sourceTableFormat, true); + try (GenericTable tableToClose = table) { + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + SyncMode.FULL, + tableName, + table, + targetTableFormats, + xTablePartitionConfig, + null); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + conversionController.sync(conversionConfig, conversionSourceProvider); Review Comment: Right now the test is only performing a single sync. We'll want to make multiple syncs to ensure that the behavior is correct. ########## xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java: ########## @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.apache.xtable.GenericTable.getTableName; +import static org.apache.xtable.model.storage.TableFormat.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Builder; +import lombok.Value; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.conversion.*; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiTestUtil; +import org.apache.xtable.model.sync.SyncMode; + +public class ITParquetConversionSource { + private static final DateTimeFormatter DATE_FORMAT = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String PARTITION_FIELD_SPEC_CONFIG = + "xtable.parquet.source.partition_field_spec_config"; + @TempDir public static Path tempDir; + private static JavaSparkContext jsc; + private static SparkSession sparkSession; + private static StructType schema; + + @BeforeAll + public static void setupOnce() { + SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir); + + String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"; + sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions); + sparkConf = HoodieReadClient.addHoodieSupport(sparkConf); + sparkConf.set("parquet.avro.write-old-list-structure", "false"); + + String javaOpts = + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "--add-opens=java.base/java.io=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"; + + sparkConf.set("spark.driver.extraJavaOptions", javaOpts); + sparkConf.set("spark.executor.extraJavaOptions", javaOpts); + + sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); + + List<Row> data = + Arrays.asList( + RowFactory.create(1, "Alice", 30, new Timestamp(System.currentTimeMillis())), + RowFactory.create(2, "Bob", 24, new Timestamp(System.currentTimeMillis() + 1000)), + RowFactory.create(3, "Charlie", 35, new Timestamp(System.currentTimeMillis() + 2000)), + RowFactory.create(4, "David", 29, new Timestamp(System.currentTimeMillis() + 3000)), + RowFactory.create(5, "Eve", 22, new Timestamp(System.currentTimeMillis() + 4000))); + + schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("age", DataTypes.IntegerType, false), + DataTypes.createStructField( + "timestamp", + DataTypes.TimestampType, + false, + new MetadataBuilder().putString("precision", "millis").build()) + }); + + Dataset<Row> df = sparkSession.createDataFrame(data, schema); + df.withColumn("year", functions.year(functions.col("timestamp").cast(DataTypes.TimestampType))) + .write() + .mode(SaveMode.Overwrite) + .partitionBy("year") + .parquet(tempDir.toAbsolutePath().toString()); + + // test if data was written correctly + Dataset<Row> reloadedDf = sparkSession.read().parquet(tempDir.toAbsolutePath().toString()); + reloadedDf.show(); + reloadedDf.printSchema(); + } + + @AfterAll + public static void teardown() { + if (jsc != null) { + jsc.stop(); + jsc = null; + } + if (sparkSession != null) { + sparkSession.stop(); + sparkSession = null; + } + } + + private static Stream<Arguments> provideArgsForFilePartitionTesting() { + String timestampFilter = + String.format( + "timestamp_micros_nullable_field < timestamp_millis(%s)", + Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, ChronoUnit.DAYS).toEpochMilli()); + String levelFilter = "level = 'INFO'"; + String nestedLevelFilter = "nested_record.level = 'INFO'"; + String severityFilter = "severity = 1"; + String timestampAndLevelFilter = String.format("%s and %s", timestampFilter, levelFilter); + return Stream.of( + Arguments.of( + buildArgsForPartition( + PARQUET, + Arrays.asList(ICEBERG, DELTA, HUDI), + "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD + // "year=YYYY/month=MM/day=DD" + "timestamp:YEAR:year=YYYY", + levelFilter))); + } + + private static TableFormatPartitionDataHolder buildArgsForPartition( + String sourceFormat, + List<String> targetFormats, + String hudiPartitionConfig, + String xTablePartitionConfig, + String filter) { + return TableFormatPartitionDataHolder.builder() + .sourceTableFormat(sourceFormat) + .targetTableFormats(targetFormats) + .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig)) + .xTablePartitionConfig(xTablePartitionConfig) + .filter(filter) + .build(); + } + + private static ConversionConfig getTableSyncConfig( + String sourceTableFormat, + SyncMode syncMode, + String tableName, + GenericTable table, + List<String> targetTableFormats, + String partitionConfig, + Duration metadataRetention) { + Properties sourceProperties = new Properties(); + if (partitionConfig != null) { + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + } + SourceTable sourceTable = + SourceTable.builder() + .name(tableName) + .formatName(sourceTableFormat) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) + .build(); + + List<TargetTable> targetTables = + targetTableFormats.stream() + .map( + formatName -> + TargetTable.builder() + .name(tableName) + .formatName(formatName) + // set the metadata path to the data path as the default (required by Hudi) + .basePath(table.getDataPath()) + .metadataRetention(metadataRetention) + .build()) + .collect(Collectors.toList()); + + return ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(syncMode) + .build(); + } + + private ConversionSourceProvider<?> getConversionSourceProvider(String sourceTableFormat) { + if (sourceTableFormat.equalsIgnoreCase(PARQUET)) { + ConversionSourceProvider<Long> parquetConversionSourceProvider = + new ParquetConversionSourceProvider(); + parquetConversionSourceProvider.init(jsc.hadoopConfiguration()); + return parquetConversionSourceProvider; + } else { + throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); + } + } + + @ParameterizedTest + @MethodSource("provideArgsForFilePartitionTesting") + public void testFilePartitionedData( + TableFormatPartitionDataHolder tableFormatPartitionDataHolder) { + String tableName = getTableName(); + String sourceTableFormat = tableFormatPartitionDataHolder.getSourceTableFormat(); + List<String> targetTableFormats = tableFormatPartitionDataHolder.getTargetTableFormats(); + // Optional<String> hudiPartitionConfig = tableFormatPartitionDataHolder.getHudiSourceConfig(); + String xTablePartitionConfig = tableFormatPartitionDataHolder.getXTablePartitionConfig(); + String filter = tableFormatPartitionDataHolder.getFilter(); + ConversionSourceProvider<?> conversionSourceProvider = + getConversionSourceProvider(sourceTableFormat); + GenericTable table; + table = + GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, sourceTableFormat, true); + try (GenericTable tableToClose = table) { + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + SyncMode.FULL, + tableName, + table, + targetTableFormats, + xTablePartitionConfig, + null); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalenceWithFilter( + sourceTableFormat, tableToClose, targetTableFormats, filter); + } catch (URISyntaxException e) { + e.printStackTrace(); + } Review Comment: This will hide errors due to exceptions. You can just let the test throw exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org