the-other-tim-brown commented on code in PR #592:
URL: https://github.com/apache/incubator-xtable/pull/592#discussion_r1875038720


##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  @Builder.Default
+  private static final AvroSchemaConverter schemaExtractor = 
AvroSchemaConverter.getInstance();
+
+  @Builder.Default private static final FileSystemHelper fsHelper = 
FileSystemHelper.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetPartitionHelper parquetPartitionHelper =
+      ParquetPartitionHelper.getInstance();
+
+  private Map<String, List<String>> initPartitionInfo() {
+    return fsHelper.getPartitionFromDirectoryStructure(
+        hadoopConf, basePath, Collections.emptyMap());
+  }
+
+  /**
+   * To infer schema getting the latest file assumption is that latest file 
will have new fields
+   *
+   * @param modificationTime the commit to consider for reading the table state
+   * @return
+   */
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+
+    Optional<LocatedFileStatus> latestFile =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .max(Comparator.comparing(FileStatus::getModificationTime));
+
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.get().getPath());
+    Schema tableSchema =
+        new org.apache.parquet.avro.AvroSchemaConverter()
+            .convert(parquetMetadata.getFileMetaData().getSchema());
+
+    Set<String> partitionKeys = initPartitionInfo().keySet();
+
+    // merge schema of partition into original as partition is not part of 
parquet fie
+    if (!partitionKeys.isEmpty()) {
+      tableSchema = mergeAvroSchema(tableSchema, partitionKeys);
+    }
+    InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema);
+
+    List<InternalPartitionField> partitionFields =
+        partitionKeys.isEmpty()
+            ? Collections.emptyList()
+            : parquetPartitionHelper.getInternalPartitionField(partitionKeys, 
schema);
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime()))
+        .build();
+  }
+
+  /**
+   * Here to get current snapshot listing all files hence the -1 is being 
passed
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+
+    List<LocatedFileStatus> latestFile =
+        fsHelper.getParquetFiles(hadoopConf, 
basePath).collect(Collectors.toList());
+    Map<String, List<String>> partitionInfo = initPartitionInfo();
+    InternalTable table = getTable(-1L);
+    List<InternalDataFile> internalDataFiles =
+        latestFile.stream()
+            .map(
+                file ->
+                    InternalDataFile.builder()
+                        .physicalPath(file.getPath().toString())
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .fileSizeBytes(file.getLen())
+                        .partitionValues(
+                            parquetPartitionHelper.getPartitionValue(
+                                basePath,
+                                file.getPath().toString(),
+                                table.getReadSchema(),
+                                partitionInfo))
+                        .lastModified(file.getModificationTime())
+                        .columnStats(
+                            parquetMetadataExtractor.getColumnStatsForaFile(
+                                hadoopConf, file, table))
+                        .build())
+            .collect(Collectors.toList());
+
+    return InternalSnapshot.builder()
+        .table(table)
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        .build();
+  }
+
+  /**
+   * Whenever new file is added , condition to get new file is listing files 
whose modification time
+   * is greater than previous ysnc
+   *
+   * @param modificationTime commit to capture table changes for.
+   * @return
+   */
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    List<FileStatus> tableChanges =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    // TODO avoid doing full list of directory to get schema , just argument 
of modification time
+    // needs to be tweaked
+    InternalTable internalTable = getTable(-1L);
+    Set<InternalDataFile> internalDataFiles = new HashSet<>();
+    Map<String, List<String>> partitionInfo = initPartitionInfo();
+    for (FileStatus tableStatus : tableChanges) {
+      internalDataFiles.add(
+          InternalDataFile.builder()

Review Comment:
   We also need `recordCount` which we should be able to get from the column 
stats



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to