the-other-tim-brown commented on code in PR #728:
URL: https://github.com/apache/incubator-xtable/pull/728#discussion_r2330207102


##########
pom.xml:
##########
@@ -655,7 +657,7 @@
                         </configuration>
                     </execution>
                 </executions>
-            </plugin>
+            </plugin><!--

Review Comment:
   If this plugin definition is no longer required, it can be removed. 



##########
pom.xml:
##########
@@ -712,6 +730,16 @@
                     </execution>
                 </executions>
                 <configuration>
+                    <argLine>

Review Comment:
   Is this still required?



##########
xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java:
##########
@@ -252,7 +252,7 @@ private Map<String, HoodieColumnRangeMetadata<Comparable>> 
convertColStats(
                     columnStat.getNumValues(),
                     columnStat.getTotalSize(),
                     -1L))
-        .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, 
Function.identity()));
+        .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, 
metadata -> metadata));

Review Comment:
   These are functionally the same, why is this change required?



##########
xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java:
##########
@@ -43,17 +43,35 @@ public static <T> T createInstanceOfClass(String className, 
Object... constructo
       if (constructorArgs.length == 0) {
         return clazz.newInstance();
       }
-      Class<?>[] constructorArgTypes =
+      /*Class<?>[] constructorArgTypes =

Review Comment:
   You can remove this commented out code



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -295,15 +299,30 @@ public InternalSchema toInternalSchema(Type schema, 
String parentPath) {
                   .fieldId(fieldId == null ? null : fieldId.intValue())
                   .build());
         }
-        if (currentRepetition != Repetition.REPEATED
-            && schema.asGroupType().getName() != "list"
+        // RECORD Type (non-nullable elements)
+        if (schema.asGroupType().getName() != "list"
             && !Arrays.asList("key_value", 
"map").contains(schema.asGroupType().getName())) {
+          boolean isNullable =
+              subFields.stream()
+                          .filter(ele -> ele.getSchema().isNullable())
+                          .collect(Collectors.toList())
+                          .size()
+                      == 0

Review Comment:
   You can use Stream's `noneMatch` to simplify this check



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.hudi.HudiPathUtils;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  private static final ParquetStatsExtractor parquetStatsExtractor =
+      ParquetStatsExtractor.getInstance();
+
+  private final ParquetPartitionValueExtractor partitionValueExtractor;
+  private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  private InternalTable createInternalTableFromFile(LocatedFileStatus 
latestFile) {
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.getPath());
+    MessageType parquetSchema = 
parquetMetadataExtractor.getSchema(parquetMetadata);
+    InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, 
"");
+    List<InternalPartitionField> partitionFields = 
partitionSpecExtractor.spec(schema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+        .build();
+  }
+
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+    // get parquetFile at specific time modificationTime
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+    return createInternalTableFromFile(file);
+  }
+
+  private Stream<InternalDataFile> 
getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
+    return parquetFiles.map(
+        file ->
+            InternalDataFile.builder()
+                .physicalPath(file.getPath().toString())
+                .fileFormat(FileFormat.APACHE_PARQUET)
+                .fileSizeBytes(file.getLen())
+                .partitionValues(
+                    partitionValueExtractor.extractPartitionValues(
+                        partitionSpecExtractor.spec(
+                            
partitionValueExtractor.extractSchemaForParquetPartitions(
+                                parquetMetadataExtractor.readParquetMetadata(
+                                    hadoopConf, file.getPath()),
+                                file.getPath().toString())),
+                        HudiPathUtils.getPartitionPath(
+                            new Path(basePath), new 
Path(file.getPath().toString()))))
+                .lastModified(file.getModificationTime())
+                .columnStats(
+                    parquetStatsExtractor.getColumnStatsForaFile(
+                        
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath())))
+                .build());
+  }
+
+  private InternalDataFile createInternalDataFileFromParquetFile(FileStatus 
parquetFile) {
+    return InternalDataFile.builder()
+        .physicalPath(parquetFile.getPath().toString())
+        .partitionValues(
+            partitionValueExtractor.extractPartitionValues(
+                partitionSpecExtractor.spec(
+                    partitionValueExtractor.extractSchemaForParquetPartitions(
+                        parquetMetadataExtractor.readParquetMetadata(
+                            hadoopConf, parquetFile.getPath()),
+                        parquetFile.getPath().toString())),
+                basePath))
+        .lastModified(parquetFile.getModificationTime())
+        .fileSizeBytes(parquetFile.getLen())
+        .columnStats(
+            parquetStatsExtractor.getColumnStatsForaFile(
+                parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parquetFile.getPath())))
+        .build();
+  }
+
+  @Override
+  public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync 
syncInstants) {
+    List<Long> commitsToProcess =
+        
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+    return 
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+  }
+
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+    List<FileStatus> tableChangesAfter =
+        parquetFiles
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    InternalTable internalTable = getMostRecentTable(parquetFiles);
+    for (FileStatus tableStatus : tableChangesAfter) {
+      InternalDataFile currentDataFile = 
createInternalDataFileFromParquetFile(tableStatus);
+      addedInternalDataFiles.add(currentDataFile);
+    }
+
+    return TableChange.builder()
+        .tableAsOfChange(internalTable)
+        
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+        .build();
+  }
+
+  private InternalTable getMostRecentTable(Stream<LocatedFileStatus> 
parquetFiles) {
+    LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+    return createInternalTableFromFile(latestFile);
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    return getMostRecentTable(parquetFiles);
+  }
+
+  /**
+   * get current snapshot
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+    // to avoid consume the stream call the method twice to return the same 
stream of parquet files
+    Stream<InternalDataFile> internalDataFiles =
+        getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
+    InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, 
basePath));
+    return InternalSnapshot.builder()
+        .table(table)
+        .sourceIdentifier(
+            getCommitIdentifier(
+                getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath))
+                    .getModificationTime()))
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        .build();
+  }
+
+  private LocatedFileStatus getMostRecentParquetFile(Stream<LocatedFileStatus> 
parquetFiles) {
+    return parquetFiles
+        .max(Comparator.comparing(FileStatus::getModificationTime))
+        .orElseThrow(() -> new IllegalStateException("No files found"));
+  }
+
+  private LocatedFileStatus getParquetFileAt(
+      Stream<LocatedFileStatus> parquetFiles, long modificationTime) {
+    return parquetFiles
+        .filter(fileStatus -> fileStatus.getModificationTime() == 
modificationTime)
+        .findFirst()
+        .orElseThrow(
+            () -> new IllegalStateException("No file found at " + 
Long.valueOf(modificationTime)));
+  }
+
+  private Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, 
String basePath) {
+    try {
+      FileSystem fs = FileSystem.get(hadoopConf);
+      URI uriBasePath = new URI(basePath);
+      String parentPath = Paths.get(uriBasePath).toString();
+      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new 
Path(parentPath), true);
+      return RemoteIterators.toList(iterator).stream()
+          .filter(file -> 
file.getPath().getName().toString().endsWith("parquet"));
+    } catch (IOException | URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean isIncrementalSyncSafeFrom(Instant instant) {
+    long modficationTime = instant.getEpochSecond();
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    LocatedFileStatus parquetFile = getMostRecentParquetFile(parquetFiles);
+    Path parquetFilePath = parquetFile.getPath();
+    while (parquetFile.isFile() && parquetFile.getModificationTime() > 
modficationTime) {
+      // check the preceeding parquetFile
+      parquetFiles.filter(file -> !file.getPath().equals(parquetFilePath));

Review Comment:
   The output of this filter is not being used, what is the intention of this 
code?



##########
xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.*;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.model.sync.SyncMode;
+
+public class ITParquetConversionSource {
+  private static final DateTimeFormatter DATE_FORMAT =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  public static final String PARTITION_FIELD_SPEC_CONFIG =
+      "xtable.parquet.source.partition_field_spec_config";
+  @TempDir public static Path tempDir;
+  private static JavaSparkContext jsc;
+  private static SparkSession sparkSession;
+  private static StructType schema;
+
+  @BeforeAll
+  public static void setupOnce() {
+    SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
+    String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED";
+    sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions);
+    sparkConf = HoodieReadClient.addHoodieSupport(sparkConf);
+    sparkConf.set("parquet.avro.write-old-list-structure", "false");
+
+    String javaOpts =
+        "--add-opens=java.base/java.nio=ALL-UNNAMED "
+            + "--add-opens=java.base/java.lang=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
+            + "--add-opens=java.base/java.io=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED";
+
+    sparkConf.set("spark.driver.extraJavaOptions", javaOpts);
+    sparkConf.set("spark.executor.extraJavaOptions", javaOpts);
+
+    sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+
+    List<Row> data =
+        Arrays.asList(
+            RowFactory.create(1, "Alice", 30, new 
Timestamp(System.currentTimeMillis())),
+            RowFactory.create(2, "Bob", 24, new 
Timestamp(System.currentTimeMillis() + 1000)),
+            RowFactory.create(3, "Charlie", 35, new 
Timestamp(System.currentTimeMillis() + 2000)),
+            RowFactory.create(4, "David", 29, new 
Timestamp(System.currentTimeMillis() + 3000)),
+            RowFactory.create(5, "Eve", 22, new 
Timestamp(System.currentTimeMillis() + 4000)));
+
+    schema =
+        DataTypes.createStructType(
+            new StructField[] {
+              DataTypes.createStructField("id", DataTypes.IntegerType, false),
+              DataTypes.createStructField("name", DataTypes.StringType, false),
+              DataTypes.createStructField("age", DataTypes.IntegerType, false),
+              DataTypes.createStructField(
+                  "timestamp",
+                  DataTypes.TimestampType,
+                  false,
+                  new MetadataBuilder().putString("precision", 
"millis").build())
+            });
+
+    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+    df.withColumn("year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+        .write()
+        .mode(SaveMode.Overwrite)
+        .partitionBy("year")
+        .parquet(tempDir.toAbsolutePath().toString());
+
+    // test if data was written correctly
+    Dataset<Row> reloadedDf = 
sparkSession.read().parquet(tempDir.toAbsolutePath().toString());
+    reloadedDf.show();
+    reloadedDf.printSchema();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+    if (sparkSession != null) {
+      sparkSession.stop();
+      sparkSession = null;
+    }
+  }
+
+  private static Stream<Arguments> provideArgsForFilePartitionTesting() {
+    String timestampFilter =
+        String.format(
+            "timestamp_micros_nullable_field < timestamp_millis(%s)",
+            Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, 
ChronoUnit.DAYS).toEpochMilli());
+    String levelFilter = "level = 'INFO'";
+    String nestedLevelFilter = "nested_record.level = 'INFO'";
+    String severityFilter = "severity = 1";
+    String timestampAndLevelFilter = String.format("%s and %s", 
timestampFilter, levelFilter);
+    return Stream.of(
+        Arguments.of(
+            buildArgsForPartition(
+                PARQUET,
+                Arrays.asList(ICEBERG, DELTA, HUDI),
+                "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD

Review Comment:
   Is the plan to add these other cases?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.hudi.HudiPathUtils;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  private static final ParquetStatsExtractor parquetStatsExtractor =
+      ParquetStatsExtractor.getInstance();
+
+  private final ParquetPartitionValueExtractor partitionValueExtractor;
+  private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  private InternalTable createInternalTableFromFile(LocatedFileStatus 
latestFile) {
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.getPath());
+    MessageType parquetSchema = 
parquetMetadataExtractor.getSchema(parquetMetadata);
+    InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, 
"");
+    List<InternalPartitionField> partitionFields = 
partitionSpecExtractor.spec(schema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+        .build();
+  }
+
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+    // get parquetFile at specific time modificationTime
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+    return createInternalTableFromFile(file);
+  }
+
+  private Stream<InternalDataFile> 
getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
+    return parquetFiles.map(
+        file ->
+            InternalDataFile.builder()
+                .physicalPath(file.getPath().toString())
+                .fileFormat(FileFormat.APACHE_PARQUET)
+                .fileSizeBytes(file.getLen())
+                .partitionValues(
+                    partitionValueExtractor.extractPartitionValues(
+                        partitionSpecExtractor.spec(
+                            
partitionValueExtractor.extractSchemaForParquetPartitions(
+                                parquetMetadataExtractor.readParquetMetadata(
+                                    hadoopConf, file.getPath()),
+                                file.getPath().toString())),
+                        HudiPathUtils.getPartitionPath(
+                            new Path(basePath), new 
Path(file.getPath().toString()))))

Review Comment:
   ```suggestion
                               new Path(basePath), file.getPath())))
   ```



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -295,15 +299,30 @@ public InternalSchema toInternalSchema(Type schema, 
String parentPath) {
                   .fieldId(fieldId == null ? null : fieldId.intValue())
                   .build());
         }
-        if (currentRepetition != Repetition.REPEATED
-            && schema.asGroupType().getName() != "list"
+        // RECORD Type (non-nullable elements)
+        if (schema.asGroupType().getName() != "list"

Review Comment:
   String equality checks in java need to use `.equals`



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java:
##########
@@ -132,19 +181,23 @@ public static InternalDataFile 
toInternalDataFile(Configuration hadoopConf, Path
     try {
       FileSystem fs = FileSystem.get(hadoopConf);
       file = fs.getFileStatus(parentPath);
-      // InputPartitionFields partitionInfo = initPartitionInfo();
       footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parentPath);
       MessageType schema = parquetMetadataExtractor.getSchema(footer);
       columnStatsForAFile = getColumnStatsForaFile(footer);
-      // partitionValues = partitionExtractor.createPartitionValues(
-      //               partitionInfo);
+      partitionValues =
+          partitionValueExtractor.extractPartitionValues(
+              partitionSpecExtractor.spec(
+                  partitionValueExtractor.extractSchemaForParquetPartitions(
+                      parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
file.getPath()),
+                      file.getPath().toString())),
+              parentPath.toString());
     } catch (java.io.IOException e) {

Review Comment:
   @unical1988 can you address this?



##########
pom.xml:
##########
@@ -674,6 +676,21 @@
                         </goals>
                     </execution>
                 </executions>
+            </plugin>-->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>

Review Comment:
   This is already declared on line 717, can you remove that definition?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ParquetStatsConverterUtil {
+  public static Object convertStatBinaryTypeToLogicalType(
+      ColumnChunkMetaData columnMetaData, boolean isMin) {
+    Object returnedObj = null;
+    PrimitiveType primitiveType = columnMetaData.getPrimitiveType();
+    switch (primitiveType.getPrimitiveTypeName()) {
+      case BINARY: // TODO check if other primitiveType' needs to be handled 
as well

Review Comment:
   Can you create a new GitHub issue and reference that as part of the TODO? It 
will help in tracking the future work.



##########
xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.*;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.model.sync.SyncMode;
+
+public class ITParquetConversionSource {
+  private static final DateTimeFormatter DATE_FORMAT =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  public static final String PARTITION_FIELD_SPEC_CONFIG =
+      "xtable.parquet.source.partition_field_spec_config";
+  @TempDir public static Path tempDir;
+  private static JavaSparkContext jsc;
+  private static SparkSession sparkSession;
+  private static StructType schema;
+
+  @BeforeAll
+  public static void setupOnce() {
+    SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
+    String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED";
+    sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions);
+    sparkConf = HoodieReadClient.addHoodieSupport(sparkConf);
+    sparkConf.set("parquet.avro.write-old-list-structure", "false");
+
+    String javaOpts =
+        "--add-opens=java.base/java.nio=ALL-UNNAMED "
+            + "--add-opens=java.base/java.lang=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
+            + "--add-opens=java.base/java.io=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED";
+
+    sparkConf.set("spark.driver.extraJavaOptions", javaOpts);
+    sparkConf.set("spark.executor.extraJavaOptions", javaOpts);
+
+    sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+
+    List<Row> data =
+        Arrays.asList(
+            RowFactory.create(1, "Alice", 30, new 
Timestamp(System.currentTimeMillis())),
+            RowFactory.create(2, "Bob", 24, new 
Timestamp(System.currentTimeMillis() + 1000)),
+            RowFactory.create(3, "Charlie", 35, new 
Timestamp(System.currentTimeMillis() + 2000)),
+            RowFactory.create(4, "David", 29, new 
Timestamp(System.currentTimeMillis() + 3000)),
+            RowFactory.create(5, "Eve", 22, new 
Timestamp(System.currentTimeMillis() + 4000)));
+
+    schema =
+        DataTypes.createStructType(
+            new StructField[] {
+              DataTypes.createStructField("id", DataTypes.IntegerType, false),
+              DataTypes.createStructField("name", DataTypes.StringType, false),
+              DataTypes.createStructField("age", DataTypes.IntegerType, false),
+              DataTypes.createStructField(
+                  "timestamp",
+                  DataTypes.TimestampType,
+                  false,
+                  new MetadataBuilder().putString("precision", 
"millis").build())
+            });
+
+    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+    df.withColumn("year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+        .write()
+        .mode(SaveMode.Overwrite)
+        .partitionBy("year")
+        .parquet(tempDir.toAbsolutePath().toString());
+
+    // test if data was written correctly
+    Dataset<Row> reloadedDf = 
sparkSession.read().parquet(tempDir.toAbsolutePath().toString());
+    reloadedDf.show();
+    reloadedDf.printSchema();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+    if (sparkSession != null) {
+      sparkSession.stop();
+      sparkSession = null;
+    }
+  }
+
+  private static Stream<Arguments> provideArgsForFilePartitionTesting() {
+    String timestampFilter =
+        String.format(
+            "timestamp_micros_nullable_field < timestamp_millis(%s)",
+            Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, 
ChronoUnit.DAYS).toEpochMilli());
+    String levelFilter = "level = 'INFO'";
+    String nestedLevelFilter = "nested_record.level = 'INFO'";
+    String severityFilter = "severity = 1";
+    String timestampAndLevelFilter = String.format("%s and %s", 
timestampFilter, levelFilter);
+    return Stream.of(
+        Arguments.of(
+            buildArgsForPartition(
+                PARQUET,
+                Arrays.asList(ICEBERG, DELTA, HUDI),
+                "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD
+                // "year=YYYY/month=MM/day=DD"
+                "timestamp:YEAR:year=YYYY",
+                levelFilter)));
+  }
+
+  private static TableFormatPartitionDataHolder buildArgsForPartition(
+      String sourceFormat,
+      List<String> targetFormats,
+      String hudiPartitionConfig,
+      String xTablePartitionConfig,
+      String filter) {
+    return TableFormatPartitionDataHolder.builder()
+        .sourceTableFormat(sourceFormat)
+        .targetTableFormats(targetFormats)
+        .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
+        .xTablePartitionConfig(xTablePartitionConfig)
+        .filter(filter)
+        .build();
+  }
+
+  private static ConversionConfig getTableSyncConfig(
+      String sourceTableFormat,
+      SyncMode syncMode,
+      String tableName,
+      GenericTable table,
+      List<String> targetTableFormats,
+      String partitionConfig,
+      Duration metadataRetention) {
+    Properties sourceProperties = new Properties();
+    if (partitionConfig != null) {
+      sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+    }
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(sourceTableFormat)
+            .basePath(table.getBasePath())
+            .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name(tableName)
+                        .formatName(formatName)
+                        // set the metadata path to the data path as the 
default (required by Hudi)
+                        .basePath(table.getDataPath())
+                        .metadataRetention(metadataRetention)
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
+        .syncMode(syncMode)
+        .build();
+  }
+
+  private ConversionSourceProvider<?> getConversionSourceProvider(String 
sourceTableFormat) {
+    if (sourceTableFormat.equalsIgnoreCase(PARQUET)) {
+      ConversionSourceProvider<Long> parquetConversionSourceProvider =
+          new ParquetConversionSourceProvider();
+      parquetConversionSourceProvider.init(jsc.hadoopConfiguration());
+      return parquetConversionSourceProvider;
+    } else {
+      throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideArgsForFilePartitionTesting")
+  public void testFilePartitionedData(

Review Comment:
   Can you add a case where the data is not partitioned as well?



##########
xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.*;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.model.sync.SyncMode;
+
+public class ITParquetConversionSource {
+  private static final DateTimeFormatter DATE_FORMAT =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  public static final String PARTITION_FIELD_SPEC_CONFIG =
+      "xtable.parquet.source.partition_field_spec_config";
+  @TempDir public static Path tempDir;
+  private static JavaSparkContext jsc;
+  private static SparkSession sparkSession;
+  private static StructType schema;
+
+  @BeforeAll
+  public static void setupOnce() {
+    SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
+    String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED";
+    sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions);
+    sparkConf = HoodieReadClient.addHoodieSupport(sparkConf);
+    sparkConf.set("parquet.avro.write-old-list-structure", "false");
+
+    String javaOpts =
+        "--add-opens=java.base/java.nio=ALL-UNNAMED "
+            + "--add-opens=java.base/java.lang=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
+            + "--add-opens=java.base/java.io=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED";
+
+    sparkConf.set("spark.driver.extraJavaOptions", javaOpts);
+    sparkConf.set("spark.executor.extraJavaOptions", javaOpts);
+
+    sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+
+    List<Row> data =
+        Arrays.asList(
+            RowFactory.create(1, "Alice", 30, new 
Timestamp(System.currentTimeMillis())),
+            RowFactory.create(2, "Bob", 24, new 
Timestamp(System.currentTimeMillis() + 1000)),
+            RowFactory.create(3, "Charlie", 35, new 
Timestamp(System.currentTimeMillis() + 2000)),
+            RowFactory.create(4, "David", 29, new 
Timestamp(System.currentTimeMillis() + 3000)),
+            RowFactory.create(5, "Eve", 22, new 
Timestamp(System.currentTimeMillis() + 4000)));
+
+    schema =
+        DataTypes.createStructType(
+            new StructField[] {
+              DataTypes.createStructField("id", DataTypes.IntegerType, false),
+              DataTypes.createStructField("name", DataTypes.StringType, false),
+              DataTypes.createStructField("age", DataTypes.IntegerType, false),
+              DataTypes.createStructField(
+                  "timestamp",
+                  DataTypes.TimestampType,
+                  false,
+                  new MetadataBuilder().putString("precision", 
"millis").build())
+            });
+
+    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+    df.withColumn("year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+        .write()
+        .mode(SaveMode.Overwrite)
+        .partitionBy("year")
+        .parquet(tempDir.toAbsolutePath().toString());
+
+    // test if data was written correctly
+    Dataset<Row> reloadedDf = 
sparkSession.read().parquet(tempDir.toAbsolutePath().toString());
+    reloadedDf.show();
+    reloadedDf.printSchema();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+    if (sparkSession != null) {
+      sparkSession.stop();
+      sparkSession = null;
+    }
+  }
+
+  private static Stream<Arguments> provideArgsForFilePartitionTesting() {
+    String timestampFilter =
+        String.format(
+            "timestamp_micros_nullable_field < timestamp_millis(%s)",
+            Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, 
ChronoUnit.DAYS).toEpochMilli());
+    String levelFilter = "level = 'INFO'";
+    String nestedLevelFilter = "nested_record.level = 'INFO'";
+    String severityFilter = "severity = 1";
+    String timestampAndLevelFilter = String.format("%s and %s", 
timestampFilter, levelFilter);
+    return Stream.of(
+        Arguments.of(
+            buildArgsForPartition(
+                PARQUET,
+                Arrays.asList(ICEBERG, DELTA, HUDI),
+                "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD
+                // "year=YYYY/month=MM/day=DD"
+                "timestamp:YEAR:year=YYYY",
+                levelFilter)));
+  }
+
+  private static TableFormatPartitionDataHolder buildArgsForPartition(
+      String sourceFormat,
+      List<String> targetFormats,
+      String hudiPartitionConfig,
+      String xTablePartitionConfig,
+      String filter) {
+    return TableFormatPartitionDataHolder.builder()
+        .sourceTableFormat(sourceFormat)
+        .targetTableFormats(targetFormats)
+        .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
+        .xTablePartitionConfig(xTablePartitionConfig)
+        .filter(filter)
+        .build();
+  }
+
+  private static ConversionConfig getTableSyncConfig(
+      String sourceTableFormat,
+      SyncMode syncMode,
+      String tableName,
+      GenericTable table,
+      List<String> targetTableFormats,
+      String partitionConfig,
+      Duration metadataRetention) {
+    Properties sourceProperties = new Properties();
+    if (partitionConfig != null) {
+      sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+    }
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(sourceTableFormat)
+            .basePath(table.getBasePath())
+            .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name(tableName)
+                        .formatName(formatName)
+                        // set the metadata path to the data path as the 
default (required by Hudi)
+                        .basePath(table.getDataPath())
+                        .metadataRetention(metadataRetention)
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
+        .syncMode(syncMode)
+        .build();
+  }
+
+  private ConversionSourceProvider<?> getConversionSourceProvider(String 
sourceTableFormat) {
+    if (sourceTableFormat.equalsIgnoreCase(PARQUET)) {
+      ConversionSourceProvider<Long> parquetConversionSourceProvider =
+          new ParquetConversionSourceProvider();
+      parquetConversionSourceProvider.init(jsc.hadoopConfiguration());
+      return parquetConversionSourceProvider;
+    } else {
+      throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideArgsForFilePartitionTesting")
+  public void testFilePartitionedData(
+      TableFormatPartitionDataHolder tableFormatPartitionDataHolder) {
+    String tableName = getTableName();
+    String sourceTableFormat = 
tableFormatPartitionDataHolder.getSourceTableFormat();
+    List<String> targetTableFormats = 
tableFormatPartitionDataHolder.getTargetTableFormats();
+    // Optional<String> hudiPartitionConfig = 
tableFormatPartitionDataHolder.getHudiSourceConfig();
+    String xTablePartitionConfig = 
tableFormatPartitionDataHolder.getXTablePartitionConfig();
+    String filter = tableFormatPartitionDataHolder.getFilter();
+    ConversionSourceProvider<?> conversionSourceProvider =
+        getConversionSourceProvider(sourceTableFormat);
+    GenericTable table;
+    table =
+        GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, 
sourceTableFormat, true);
+    try (GenericTable tableToClose = table) {
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              SyncMode.FULL,
+              tableName,
+              table,
+              targetTableFormats,
+              xTablePartitionConfig,
+              null);
+      ConversionController conversionController =
+          new ConversionController(jsc.hadoopConfiguration());
+      conversionController.sync(conversionConfig, conversionSourceProvider);

Review Comment:
   Right now the test is only performing a single sync. We'll want to make 
multiple syncs to ensure that the behavior is correct.



##########
xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.*;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.model.sync.SyncMode;
+
+public class ITParquetConversionSource {
+  private static final DateTimeFormatter DATE_FORMAT =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  public static final String PARTITION_FIELD_SPEC_CONFIG =
+      "xtable.parquet.source.partition_field_spec_config";
+  @TempDir public static Path tempDir;
+  private static JavaSparkContext jsc;
+  private static SparkSession sparkSession;
+  private static StructType schema;
+
+  @BeforeAll
+  public static void setupOnce() {
+    SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
+    String extraJavaOptions = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED";
+    sparkConf.set("spark.driver.extraJavaOptions", extraJavaOptions);
+    sparkConf = HoodieReadClient.addHoodieSupport(sparkConf);
+    sparkConf.set("parquet.avro.write-old-list-structure", "false");
+
+    String javaOpts =
+        "--add-opens=java.base/java.nio=ALL-UNNAMED "
+            + "--add-opens=java.base/java.lang=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util=ALL-UNNAMED "
+            + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
+            + "--add-opens=java.base/java.io=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+            + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED";
+
+    sparkConf.set("spark.driver.extraJavaOptions", javaOpts);
+    sparkConf.set("spark.executor.extraJavaOptions", javaOpts);
+
+    sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+
+    List<Row> data =
+        Arrays.asList(
+            RowFactory.create(1, "Alice", 30, new 
Timestamp(System.currentTimeMillis())),
+            RowFactory.create(2, "Bob", 24, new 
Timestamp(System.currentTimeMillis() + 1000)),
+            RowFactory.create(3, "Charlie", 35, new 
Timestamp(System.currentTimeMillis() + 2000)),
+            RowFactory.create(4, "David", 29, new 
Timestamp(System.currentTimeMillis() + 3000)),
+            RowFactory.create(5, "Eve", 22, new 
Timestamp(System.currentTimeMillis() + 4000)));
+
+    schema =
+        DataTypes.createStructType(
+            new StructField[] {
+              DataTypes.createStructField("id", DataTypes.IntegerType, false),
+              DataTypes.createStructField("name", DataTypes.StringType, false),
+              DataTypes.createStructField("age", DataTypes.IntegerType, false),
+              DataTypes.createStructField(
+                  "timestamp",
+                  DataTypes.TimestampType,
+                  false,
+                  new MetadataBuilder().putString("precision", 
"millis").build())
+            });
+
+    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+    df.withColumn("year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+        .write()
+        .mode(SaveMode.Overwrite)
+        .partitionBy("year")
+        .parquet(tempDir.toAbsolutePath().toString());
+
+    // test if data was written correctly
+    Dataset<Row> reloadedDf = 
sparkSession.read().parquet(tempDir.toAbsolutePath().toString());
+    reloadedDf.show();
+    reloadedDf.printSchema();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+    if (sparkSession != null) {
+      sparkSession.stop();
+      sparkSession = null;
+    }
+  }
+
+  private static Stream<Arguments> provideArgsForFilePartitionTesting() {
+    String timestampFilter =
+        String.format(
+            "timestamp_micros_nullable_field < timestamp_millis(%s)",
+            Instant.now().truncatedTo(ChronoUnit.DAYS).minus(2, 
ChronoUnit.DAYS).toEpochMilli());
+    String levelFilter = "level = 'INFO'";
+    String nestedLevelFilter = "nested_record.level = 'INFO'";
+    String severityFilter = "severity = 1";
+    String timestampAndLevelFilter = String.format("%s and %s", 
timestampFilter, levelFilter);
+    return Stream.of(
+        Arguments.of(
+            buildArgsForPartition(
+                PARQUET,
+                Arrays.asList(ICEBERG, DELTA, HUDI),
+                "timestamp:YEAR:year=YYYY", // ts:DAY:year=YYYY/month=MM/day=DD
+                // "year=YYYY/month=MM/day=DD"
+                "timestamp:YEAR:year=YYYY",
+                levelFilter)));
+  }
+
+  private static TableFormatPartitionDataHolder buildArgsForPartition(
+      String sourceFormat,
+      List<String> targetFormats,
+      String hudiPartitionConfig,
+      String xTablePartitionConfig,
+      String filter) {
+    return TableFormatPartitionDataHolder.builder()
+        .sourceTableFormat(sourceFormat)
+        .targetTableFormats(targetFormats)
+        .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
+        .xTablePartitionConfig(xTablePartitionConfig)
+        .filter(filter)
+        .build();
+  }
+
+  private static ConversionConfig getTableSyncConfig(
+      String sourceTableFormat,
+      SyncMode syncMode,
+      String tableName,
+      GenericTable table,
+      List<String> targetTableFormats,
+      String partitionConfig,
+      Duration metadataRetention) {
+    Properties sourceProperties = new Properties();
+    if (partitionConfig != null) {
+      sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+    }
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(sourceTableFormat)
+            .basePath(table.getBasePath())
+            .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name(tableName)
+                        .formatName(formatName)
+                        // set the metadata path to the data path as the 
default (required by Hudi)
+                        .basePath(table.getDataPath())
+                        .metadataRetention(metadataRetention)
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
+        .syncMode(syncMode)
+        .build();
+  }
+
+  private ConversionSourceProvider<?> getConversionSourceProvider(String 
sourceTableFormat) {
+    if (sourceTableFormat.equalsIgnoreCase(PARQUET)) {
+      ConversionSourceProvider<Long> parquetConversionSourceProvider =
+          new ParquetConversionSourceProvider();
+      parquetConversionSourceProvider.init(jsc.hadoopConfiguration());
+      return parquetConversionSourceProvider;
+    } else {
+      throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideArgsForFilePartitionTesting")
+  public void testFilePartitionedData(
+      TableFormatPartitionDataHolder tableFormatPartitionDataHolder) {
+    String tableName = getTableName();
+    String sourceTableFormat = 
tableFormatPartitionDataHolder.getSourceTableFormat();
+    List<String> targetTableFormats = 
tableFormatPartitionDataHolder.getTargetTableFormats();
+    // Optional<String> hudiPartitionConfig = 
tableFormatPartitionDataHolder.getHudiSourceConfig();
+    String xTablePartitionConfig = 
tableFormatPartitionDataHolder.getXTablePartitionConfig();
+    String filter = tableFormatPartitionDataHolder.getFilter();
+    ConversionSourceProvider<?> conversionSourceProvider =
+        getConversionSourceProvider(sourceTableFormat);
+    GenericTable table;
+    table =
+        GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, 
sourceTableFormat, true);
+    try (GenericTable tableToClose = table) {
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              SyncMode.FULL,
+              tableName,
+              table,
+              targetTableFormats,
+              xTablePartitionConfig,
+              null);
+      ConversionController conversionController =
+          new ConversionController(jsc.hadoopConfiguration());
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalenceWithFilter(
+          sourceTableFormat, tableToClose, targetTableFormats, filter);
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }

Review Comment:
   This will hide errors due to exceptions. You can just let the test throw 
exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to