the-other-tim-brown commented on code in PR #592: URL: https://github.com/apache/incubator-xtable/pull/592#discussion_r1875038720
########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java: ########## @@ -0,0 +1,209 @@ +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.NonNull; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.xtable.avro.AvroSchemaConverter; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Builder +public class ParquetConversionSource implements ConversionSource<Long> { + + private final String tableName; + private final String basePath; + @NonNull private final Configuration hadoopConf; + + @Builder.Default + private static final AvroSchemaConverter schemaExtractor = AvroSchemaConverter.getInstance(); + + @Builder.Default private static final FileSystemHelper fsHelper = FileSystemHelper.getInstance(); + + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + @Builder.Default + private static final ParquetPartitionHelper parquetPartitionHelper = + ParquetPartitionHelper.getInstance(); + + private Map<String, List<String>> initPartitionInfo() { + return fsHelper.getPartitionFromDirectoryStructure( + hadoopConf, basePath, Collections.emptyMap()); + } + + /** + * To infer schema getting the latest file assumption is that latest file will have new fields + * + * @param modificationTime the commit to consider for reading the table state + * @return + */ + @Override + public InternalTable getTable(Long modificationTime) { + + Optional<LocatedFileStatus> latestFile = + fsHelper + .getParquetFiles(hadoopConf, basePath) + .max(Comparator.comparing(FileStatus::getModificationTime)); + + ParquetMetadata parquetMetadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath()); + Schema tableSchema = + new org.apache.parquet.avro.AvroSchemaConverter() + .convert(parquetMetadata.getFileMetaData().getSchema()); + + Set<String> partitionKeys = initPartitionInfo().keySet(); + + // merge schema of partition into original as partition is not part of parquet fie + if (!partitionKeys.isEmpty()) { + tableSchema = mergeAvroSchema(tableSchema, partitionKeys); + } + InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema); + + List<InternalPartitionField> partitionFields = + partitionKeys.isEmpty() + ? Collections.emptyList() + : parquetPartitionHelper.getInternalPartitionField(partitionKeys, schema); + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) + .build(); + } + + /** + * Here to get current snapshot listing all files hence the -1 is being passed + * + * @return + */ + @Override + public InternalSnapshot getCurrentSnapshot() { + + List<LocatedFileStatus> latestFile = + fsHelper.getParquetFiles(hadoopConf, basePath).collect(Collectors.toList()); + Map<String, List<String>> partitionInfo = initPartitionInfo(); + InternalTable table = getTable(-1L); + List<InternalDataFile> internalDataFiles = + latestFile.stream() + .map( + file -> + InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues( + parquetPartitionHelper.getPartitionValue( + basePath, + file.getPath().toString(), + table.getReadSchema(), + partitionInfo)) + .lastModified(file.getModificationTime()) + .columnStats( + parquetMetadataExtractor.getColumnStatsForaFile( + hadoopConf, file, table)) + .build()) + .collect(Collectors.toList()); + + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + .build(); + } + + /** + * Whenever new file is added , condition to get new file is listing files whose modification time + * is greater than previous ysnc + * + * @param modificationTime commit to capture table changes for. + * @return + */ + @Override + public TableChange getTableChangeForCommit(Long modificationTime) { + List<FileStatus> tableChanges = + fsHelper + .getParquetFiles(hadoopConf, basePath) + .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) + .collect(Collectors.toList()); + // TODO avoid doing full list of directory to get schema , just argument of modification time + // needs to be tweaked + InternalTable internalTable = getTable(-1L); + Set<InternalDataFile> internalDataFiles = new HashSet<>(); + Map<String, List<String>> partitionInfo = initPartitionInfo(); + for (FileStatus tableStatus : tableChanges) { + internalDataFiles.add( + InternalDataFile.builder() Review Comment: We also need `recordCount` which we should be able to get from the column stats -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org