This is an automated email from the ASF dual-hosted git repository. timbrown pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new 108ec647 [553] Parquet Schema and Column Stats Converters (#669) 108ec647 is described below commit 108ec6473c6419a6b93ad0ffe2d39e46d9290f07 Author: Selim Soufargi <80632333+unical1...@users.noreply.github.com> AuthorDate: Sat May 17 21:00:13 2025 +0200 [553] Parquet Schema and Column Stats Converters (#669) * smaller PR for parquet * read parquet file for metadataExtractor: compiling, not testd * cleanups for statsExtractor: compiling, not testd * refactoring for statsExtractor: compiling, not testd * added avro dependency * added tests for SchemaExtractor: int and string primitiveTypes test passes * fixed some minor bugs in SchemaExtractor * close fileReader and handle exception * adjusted fromInternalSchema() * added a test and adjusted SchemaExtractor * added a testing code * bug fix for Schema extractor: groupType * bug fix for Schema extractor * bug fix for tests * bug fix for SchemaExtractor and added tests for nested lists support * bug fix for tests for nested lists support * bug fix for complex test which now passes! * added test for Map * schemaExtractor refactored * bug fixed isNullable() schema * fromInternalSchema : list and map types * decimal primitive test added * float primitive + list and map tests for fromInternalSchema * added tests for primitive type (date and timestamp) * refactoring for partitionValues extractor * git build error fixed * cleanups for schemaExtractor + refactoring for schemaExtractorTests + added test code for statsExtractor * added assertsEqual test for stats + removed partitionFields from the test, TODO check if field is needed in ColumnStats * bug fixed for stats tests: columnStats + tests data are read using FileReader * bug fixed for stats tests, TODO equality test for two objects * added compareFiles() in InternDataFile for the statsExtractor tests to pass: OK * added custom comparison test for ColumnStat and InternDataFile, test passes, TODO: other stat types and other schema types testing * added custom comparison test for ColumnStat (field) and exec spotless apply * tempDir for parquet stats testing * binaryStatistics test passes * added int32 file schema test for statsExtractor * cleanups + added fields comparison for InternalDataFile * cleanups + added fixed_len_byte_array primitive type schema file test * use of genericGetMax instead for stats extraction + cleanups * boolean schema file test for statsExtractor added * removed hard coded path in statsExtractor test * cleanups + imports * separate tests for int and binary for stats * custom equals() not needed for InternalDataFile and ColumnStat * removed parquet version from core sub-project pom * statsExtractor tests as a suite, removed comments + run spotless apply * removed uncessary classes * removed uncessary classes: undo * undo irrelevant changes * fixed formatting issues with spotless:apply cmd * cleanups for test class and fixes for failed build * tmp file name fixed for failed build * cleanups * splotless apply run + assertion internalDataFile equality changed to display errors * fixes for build, PhysicalPath and BinaryStats * fixes for build, PhysicalPath and BinaryStats + synced fork * fixes for build, PhysicalPath and BinaryStats + synced fork * fixes for build and cleanups * fixes for build and cleanups * Parquet dep set as provided to use Spark's * parquet dep version back to 1.15.1 * parquet-avro moved from core to project's pom * parquet-avro moved after hadoop-common * parquet dep scope removed * run spotless:apply --------- Co-authored-by: Selim Soufargi <ssoufargi.idealab.uni...@gmail.com~> --- .../apache/xtable/conversion/ExternalTable.java | 4 + .../ThreePartHierarchicalTableIdentifier.java | 1 + .../apache/xtable/model/schema/InternalField.java | 2 + .../apache/xtable/model/schema/InternalSchema.java | 3 +- xtable-core/pom.xml | 6 +- .../xtable/parquet/ParquetMetadataExtractor.java | 63 +++ .../xtable/parquet/ParquetSchemaExtractor.java | 494 +++++++++++++++++++++ .../xtable/parquet/ParquetStatsExtractor.java | 154 +++++++ .../org/apache/xtable/TestAbstractHudiTable.java | 1 + .../java/org/apache/xtable/TestJavaHudiTable.java | 1 + .../org/apache/xtable/iceberg/StubCatalog.java | 1 + .../xtable/parquet/TestParquetSchemaExtractor.java | 344 ++++++++++++++ .../xtable/parquet/TestParquetStatsExtractor.java | 454 +++++++++++++++++++ .../apache/xtable/utilities/RunCatalogSync.java | 7 + 14 files changed, 1533 insertions(+), 2 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java index 939c59c0..a6c97d8f 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java @@ -34,12 +34,16 @@ import com.google.common.base.Preconditions; class ExternalTable { /** The name of the table. */ protected final @NonNull String name; + /** The format of the table (e.g. DELTA, ICEBERG, HUDI) */ protected final @NonNull String formatName; + /** The path to the root of the table or the metadata directory depending on the format */ protected final @NonNull String basePath; + /** Optional namespace for the table */ protected final String[] namespace; + /** The configuration for interacting with the catalog that manages this table */ protected final CatalogConfig catalogConfig; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java index 2608d36a..f387c7d3 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java @@ -42,6 +42,7 @@ public class ThreePartHierarchicalTableIdentifier implements HierarchicalTableId * name varies depending on the catalogType. */ String catalogName; + /** * Catalogs have the ability to group tables logically, databaseName is the identifier for such * logical classification. The alternate names for this field include namespace, schemaName etc. diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 16b6da8a..31eb0ed4 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,9 +43,11 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; + // represents the fully qualified path to the field (dot separated) @Getter(lazy = true) String path = createPath(); + // splits the dot separated path into parts @Getter(lazy = true) String[] pathParts = splitPath(); diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java index 20af37e0..8b7e0fc5 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java @@ -75,7 +75,8 @@ public class InternalSchema { public enum MetadataValue { MICROS, - MILLIS + MILLIS, + NANOS } public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType"; diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 24bc31df..6bd5282c 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -56,7 +56,7 @@ <artifactId>guava</artifactId> </dependency> - <!-- Avro --> + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> @@ -116,6 +116,10 @@ <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + </dependency> <!-- Logging API --> <dependency> diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java new file mode 100644 index 00000000..f29a186d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; + +import org.apache.xtable.exception.ReadException; + +public class ParquetMetadataExtractor { + + private static final ParquetMetadataExtractor INSTANCE = new ParquetMetadataExtractor(); + + public static ParquetMetadataExtractor getInstance() { + return INSTANCE; + } + + public static MessageType getSchema(ParquetMetadata footer) { + MessageType schema = footer.getFileMetaData().getSchema(); + return schema; + } + + public static ParquetMetadata readParquetMetadata(Configuration conf, Path filePath) { + InputFile file = null; + try { + file = HadoopInputFile.fromPath(filePath, conf); + } catch (IOException e) { + throw new ReadException("Failed to read the parquet file", e); + } + + ParquetReadOptions options = HadoopReadOptions.builder(conf, filePath).build(); + try (ParquetFileReader fileReader = ParquetFileReader.open(file, options)) { + return fileReader.getFooter(); + } catch (Exception e) { + throw new ReadException("Failed to read the parquet file", e); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java new file mode 100644 index 00000000..9043ac0d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.avro.Schema; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Types; + +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.SchemaExtractorException; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.schema.SchemaUtils; + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean isNullable(Type schema) { + return schema.getRepetition() != Repetition.REQUIRED; + } + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @return a converted schema + */ + public InternalSchema toInternalSchema(Type schema, String parentPath) { + InternalType newDataType = null; + Type.Repetition currentRepetition = null; + List<InternalField> subFields = null; + PrimitiveType primitiveType; + LogicalTypeAnnotation logicalType; + Map<InternalSchema.MetadataKey, Object> metadata = + new EnumMap<>(InternalSchema.MetadataKey.class); + String elementName = schema.getName(); + if (schema.isPrimitive()) { + primitiveType = schema.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + // PrimitiveTypes + case INT64: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit(); + boolean isAdjustedToUTC = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType) + .isAdjustedToUTC(); + if (isAdjustedToUTC) { + newDataType = InternalType.TIMESTAMP; + } else { + newDataType = InternalType.TIMESTAMP_NTZ; + } + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) { + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MICROS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MILLIS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.NANOS); + } + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + newDataType = InternalType.INT; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS + || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + newDataType = InternalType.INT; + } + } else if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.LONG; + } + break; + case INT32: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) { + newDataType = InternalType.DATE; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.INT; + } + } else if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.INT; + } + break; + case FLOAT: + newDataType = InternalType.FLOAT; + break; + case FIXED_LEN_BYTE_ARRAY: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) { + newDataType = InternalType.UUID; + } else if (logicalType instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) { + metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 12); + newDataType = InternalType.FIXED; + } else if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + } + break; + case BINARY: + // TODO Variant,GEOMETRY, GEOGRAPHY, + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.ENUM_VALUES, logicalType.toOriginalType().values()); + newDataType = InternalType.ENUM; + } else if (logicalType instanceof LogicalTypeAnnotation.JsonLogicalTypeAnnotation) { + newDataType = InternalType.BYTES; + } else if (logicalType instanceof LogicalTypeAnnotation.BsonLogicalTypeAnnotation) { + newDataType = InternalType.BYTES; + } else if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + newDataType = InternalType.STRING; + } else if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.BYTES; + } + break; + case BOOLEAN: + newDataType = InternalType.BOOLEAN; + break; + default: + throw new UnsupportedSchemaTypeException( + String.format("Unsupported schema type %s", schema)); + } + } else { + // GroupTypes + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { + String schemaName = schema.asGroupType().getName(); + Type.ID schemaId = schema.getId(); + InternalSchema elementSchema = + toInternalSchema( + schema.asGroupType().getType(0), + SchemaUtils.getFullyQualifiedPath( + parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)); + InternalField elementField = + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(parentPath) + .schema(elementSchema) + .fieldId(schemaId == null ? null : schemaId.intValue()) + .build(); + return InternalSchema.builder() + .name(schema.getName()) + .dataType(InternalType.LIST) + .comment(null) + .isNullable(isNullable(schema.asGroupType())) + .fields(Collections.singletonList(elementField)) + .build(); + } else if (logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { + String schemaName = schema.asGroupType().getName(); + Type.ID schemaId = schema.getId(); + InternalSchema valueSchema = + toInternalSchema( + schema.asGroupType().getType(0), + SchemaUtils.getFullyQualifiedPath( + parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME)); + InternalField valueField = + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .parentPath(parentPath) + .schema(valueSchema) + .fieldId(schemaId == null ? null : schemaId.intValue()) + .build(); + return InternalSchema.builder() + .name(schemaName) + .dataType(InternalType.MAP) + .comment(null) + .isNullable(isNullable(schema.asGroupType())) + .fields(valueSchema.getFields()) + .build(); + } else { + + subFields = new ArrayList<>(schema.asGroupType().getFields().size()); + for (Type parquetField : schema.asGroupType().getFields()) { + String fieldName = parquetField.getName(); + Type.ID fieldId = parquetField.getId(); + currentRepetition = parquetField.getRepetition(); + InternalSchema subFieldSchema = + toInternalSchema( + parquetField, SchemaUtils.getFullyQualifiedPath(parentPath, fieldName)); + + if (schema.asGroupType().getFields().size() + == 1) { // TODO Tuple (many subelements in a list) + newDataType = subFieldSchema.getDataType(); + elementName = subFieldSchema.getName(); + break; + } + subFields.add( + InternalField.builder() + .parentPath(parentPath) + .name(fieldName) + .schema(subFieldSchema) + .defaultValue(null) + .fieldId(fieldId == null ? null : fieldId.intValue()) + .build()); + } + if (currentRepetition != Repetition.REPEATED + && schema.asGroupType().getName() != "list" + && !Arrays.asList("key_value", "map").contains(schema.asGroupType().getName())) { + return InternalSchema.builder() + .name(schema.getName()) + .comment(null) + .dataType(InternalType.RECORD) + .fields(subFields) + .isNullable(isNullable(schema.asGroupType())) + .build(); + } + } + } + return InternalSchema.builder() + .name(elementName) + .dataType(newDataType) + .fields(subFields == null || subFields.size() == 0 ? null : subFields) + .comment(null) + .isNullable(isNullable(schema)) + .metadata(metadata.isEmpty() ? null : metadata) + .build(); + } + + /** + * Internal method for converting the {@link InternalSchema} to parquet {@link Schema}. + * + * @param internalSchema internal schema representation + * @param currentPath If this schema is nested within another, this will be a dot separated + * string. This is used for the parquet namespace to guarantee unique names for nested + * records. + * @return an parquet schema + */ + public Type fromInternalSchema(InternalSchema internalSchema, String currentPath) { + Type type = null; + Type listType = null; + Type mapType = null; + Type mapKeyType = null; + Type mapValueType = null; + String fieldName = internalSchema.getName(); + InternalType internalType = internalSchema.getDataType(); + switch (internalType) { + case BOOLEAN: + type = + Types.required(PrimitiveTypeName.BOOLEAN) + .as(LogicalTypeAnnotation.intType(8, false)) + .named(fieldName); + break; + case INT: + type = + Types.required(PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.intType(32, false)) + .named(fieldName); + break; + case LONG: + type = + Types.required(PrimitiveTypeName.INT64) + .as(LogicalTypeAnnotation.intType(64, false)) + .named(fieldName); + break; + case STRING: + type = + Types.required(PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named(fieldName); + break; + case FLOAT: + type = Types.required(PrimitiveTypeName.FLOAT).named(fieldName); + break; + case DECIMAL: + int precision = + (int) internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + int scale = + (int) internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); + type = + Types.required(PrimitiveTypeName.FLOAT) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) + .named(fieldName); + break; + + case ENUM: + type = + new org.apache.parquet.avro.AvroSchemaConverter() + .convert( + Schema.createEnum( + fieldName, + internalSchema.getComment(), + null, + (List<String>) + internalSchema + .getMetadata() + .get(InternalSchema.MetadataKey.ENUM_VALUES), + null)) + .getType(fieldName); + break; + case DATE: + type = + Types.required(PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.dateType()) + .named(fieldName); + break; + case TIMESTAMP: + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) + == InternalSchema.MetadataValue.MICROS) { + type = + Types.required(PrimitiveTypeName.INT64) + .as( + LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named(fieldName); + } + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) + == InternalSchema.MetadataValue.MILLIS) { + type = + Types.required(PrimitiveTypeName.INT64) + .as( + LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named(fieldName); + } else if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) + == InternalSchema.MetadataValue.NANOS) { + type = + Types.required(PrimitiveTypeName.INT64) + .as( + LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named(fieldName); + } + break; + case TIMESTAMP_NTZ: + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) + == InternalSchema.MetadataValue.MICROS) { + type = + Types.required(PrimitiveTypeName.INT64) + .as( + LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named(fieldName); + + } else { + type = + Types.required(PrimitiveTypeName.INT64) + .as( + LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named(fieldName); + } + break; + case LIST: + InternalField elementField = + internalSchema.getFields().stream() + .filter( + field -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(field.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid array schema")); + listType = fromInternalSchema(elementField.getSchema(), elementField.getPath()); + type = Types.requiredList().setElementType(listType).named(internalSchema.getName()); + // TODO nullable lists + break; + case MAP: + InternalField keyField = + internalSchema.getFields().stream() + .filter(field -> InternalField.Constants.MAP_KEY_FIELD_NAME.equals(field.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); + InternalField valueField = + internalSchema.getFields().stream() + .filter( + field -> InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(field.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); + mapKeyType = fromInternalSchema(keyField.getSchema(), valueField.getPath()); + mapValueType = fromInternalSchema(valueField.getSchema(), valueField.getPath()); + type = + Types.requiredMap().key(mapKeyType).value(mapValueType).named(internalSchema.getName()); + // TODO nullable lists + break; + case RECORD: + List<Type> fields = + internalSchema.getFields().stream() + .map( + field -> + fromInternalSchema( + field.getSchema(), + SchemaUtils.getFullyQualifiedPath(field.getName(), currentPath))) + .collect(CustomCollectors.toList(internalSchema.getFields().size())); + type = + Types.requiredGroup().addFields(fields.stream().toArray(Type[]::new)).named(fieldName); + break; + default: + throw new UnsupportedSchemaTypeException( + "Encountered unhandled type during InternalSchema to parquet conversion:" + + internalType); + } + return type; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java new file mode 100644 index 00000000..8d02b392 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import lombok.Builder; +import lombok.Value; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = new ParquetStatsExtractor(); + + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + // private static final InputPartitionFields partitions = null; + + public static ParquetStatsExtractor getInstance() { + return INSTANCE; + } + + public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata footer) { + return getStatsForFile(footer).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) { + return columnStats.stream() + .filter(entry -> entry.getField().getParentPath() == null) + .map(ColumnStat::getNumValues) + .filter(numValues -> numValues > 0) + .max(Long::compareTo); + } + + public static Map<ColumnDescriptor, List<ColumnStat>> getStatsForFile(ParquetMetadata footer) { + Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>(); + MessageType schema = parquetMetadataExtractor.getSchema(footer); + List<ColumnChunkMetaData> columns = new ArrayList<>(); + columns = + footer.getBlocks().stream() + .flatMap(blockMetaData -> blockMetaData.getColumns().stream()) + .collect(Collectors.toList()); + columnDescStats = + columns.stream() + .collect( + Collectors.groupingBy( + columnMetaData -> + schema.getColumnDescription(columnMetaData.getPath().toArray()), + Collectors.mapping( + columnMetaData -> + ColumnStat.builder() + .field( + InternalField.builder() + .name(columnMetaData.getPrimitiveType().getName()) + .fieldId( + columnMetaData.getPrimitiveType().getId() == null + ? null + : columnMetaData + .getPrimitiveType() + .getId() + .intValue()) + .parentPath(null) + .schema( + schemaExtractor.toInternalSchema( + columnMetaData.getPrimitiveType(), + columnMetaData.getPath().toDotString())) + .build()) + .numValues(columnMetaData.getValueCount()) + .totalSize(columnMetaData.getTotalSize()) + .range( + Range.vector( + columnMetaData.getStatistics().genericGetMin(), + columnMetaData.getStatistics().genericGetMax())) + .build(), + Collectors.toList()))); + return columnDescStats; + } + + /* private static InputPartitionFields initPartitionInfo() { + return partitions; + }*/ + + public static InternalDataFile toInternalDataFile(Configuration hadoopConf, Path parentPath) + throws IOException { + FileStatus file = null; + List<PartitionValue> partitionValues = null; + ParquetMetadata footer = null; + List<ColumnStat> columnStatsForAFile = null; + try { + FileSystem fs = FileSystem.get(hadoopConf); + file = fs.getFileStatus(parentPath); + // InputPartitionFields partitionInfo = initPartitionInfo(); + footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath); + MessageType schema = parquetMetadataExtractor.getSchema(footer); + columnStatsForAFile = getColumnStatsForaFile(footer); + // partitionValues = partitionExtractor.createPartitionValues( + // partitionInfo); + } catch (java.io.IOException e) { + + } + return InternalDataFile.builder() + .physicalPath(parentPath.toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + // .partitionValues(partitionValues) + .fileSizeBytes(file.getLen()) + .recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L)) + .columnStats(columnStatsForAFile) + .lastModified(file.getModificationTime()) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java index 252b5b26..89460c40 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java @@ -127,6 +127,7 @@ public abstract class TestAbstractHudiTable throw new UncheckedIOException(ex); } } + // Name of the table protected String tableName; // Base path for the table diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java index ce3b25bd..abbe7fe6 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java @@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable { private HoodieJavaWriteClient<HoodieAvroPayload> writeClient; private final Configuration conf; + /** * Create a test table instance for general testing. The table is created with the schema defined * in basic_schema.avsc which contains many data types to ensure they are handled correctly. diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java index a25063c4..9475d503 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java @@ -36,6 +36,7 @@ public class StubCatalog implements Catalog { public static void registerMock(String catalogName, Catalog catalog) { REGISTERED_MOCKS.put(catalogName, catalog); } + // use a mocked catalog instance to more easily test private Catalog mockedCatalog; diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java new file mode 100644 index 00000000..13d2299f --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.parquet.schema.*; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder() + .name("decimal") + .dataType(InternalType.DECIMAL) + .isNullable(false) + .metadata(fixedDecimalMetadata) + .build(); + + Type stringPrimitiveType = + Types.required(PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("string"); + + Type intPrimitiveType = + Types.required(PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = + Types.required(PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + Assertions.assertEquals(primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null)); + + Assertions.assertEquals(decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); + + // tests for timestamp and date + InternalSchema testDate = + InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build(); + + Map<InternalSchema.MetadataKey, Object> millisMetadata = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + Map<InternalSchema.MetadataKey, Object> microsMetadata = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + Map<InternalSchema.MetadataKey, Object> nanosMetadata = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.NANOS); + + InternalSchema testTimestampMillis = + InternalSchema.builder() + .name("timestamp_millis") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(false) + .metadata(millisMetadata) + .build(); + + InternalSchema testTimestampMicros = + InternalSchema.builder() + .name("timestamp_micros") + .dataType(InternalType.TIMESTAMP) + .isNullable(false) + .metadata(microsMetadata) + .build(); + + InternalSchema testTimestampNanos = + InternalSchema.builder() + .name("timestamp_nanos") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(false) + .metadata(nanosMetadata) + .build(); + + Type timestampMillisPrimitiveType = + Types.required(PrimitiveTypeName.INT64) + .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp_millis"); + Type timestampNanosPrimitiveType = + Types.required(PrimitiveTypeName.INT64) + .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_nanos"); + Assertions.assertEquals( + testTimestampMillis, schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null)); + Assertions.assertEquals( + testTimestampNanos, schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null)); + + // test date + + Type datePrimitiveType = + Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date"); + Assertions.assertEquals(testDate, schemaExtractor.toInternalSchema(datePrimitiveType, null)); + } + + @Test + public void testGroupTypes() { + + // map + + InternalSchema internalMap = + InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + /* testing from fromInternalSchema()*/ + + GroupType fromSimpleList = + Types.requiredList() + .element( + Types.required(PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.intType(32, false)) + .named(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)) + .named("my_list"); + + InternalSchema fromInternalList = + InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(null) + .schema( + InternalSchema.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + GroupType fromTestMap = + Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value( + Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED) + .as(LogicalTypeAnnotation.intType(32, false)) + .named("value")) + .named("map"); + InternalSchema fromInternalMap = + InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key") // "key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("_one_field_value") // "value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("my_group") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("name") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(null) + .build())) + .dataType(InternalType.RECORD) + .build(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("my_record") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("my_list") + .schema( + InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_list") + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("my_group") + .schema(recordListElementSchema) + .defaultValue(null) + .build())) + .build(); + + Assertions.assertEquals(fromTestMap, schemaExtractor.fromInternalSchema(fromInternalMap, null)); + Assertions.assertEquals( + fromSimpleList, schemaExtractor.fromInternalSchema(fromInternalList, null)); + + GroupType testGroupType = + Types.requiredGroup() + .required(PrimitiveTypeName.INT64) + .named("id") + .optional(PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("name") + .named("my_group"); + + GroupType testMap = + Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("value")) + .named("map"); + GroupType listType = + Types.requiredList() + .setElementType( + Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("element")) + .named("my_list"); + MessageType messageType = + Types.buildMessage() + // .addField(testMap) + .addField(listType) + .addField(testGroupType) + .named("my_record"); + + Assertions.assertEquals(internalMap, schemaExtractor.toInternalSchema(testMap, null)); + Assertions.assertEquals(internalSchema, schemaExtractor.toInternalSchema(messageType, null)); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java new file mode 100644 index 00000000..7522c292 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.BooleanStatistics; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.*; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; + +public class TestParquetStatsExtractor { + + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @TempDir static java.nio.file.Path tempDir = Paths.get("./"); + + public static List<ColumnStat> initBooleanFileTest(File file) throws IOException { + // create the parquet file by parsing a schema + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = + MessageTypeParser.parseMessageType("message m { required group a {required boolean b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + BooleanStatistics stats = new BooleanStatistics(); + stats.updateStats(true); + stats.updateStats(false); + + // write the string columned file + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + w.writeDataPage(2, 4, BytesInput.fromInt(1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.fromInt(0), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 8, codec); + + w.writeDataPage(7, 4, BytesInput.fromInt(0), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + + // reconstruct the stats for the InternalDataFile testing object + + boolean minStat = stats.genericGetMin(); + + boolean maxStat = stats.genericGetMax(); + PrimitiveType primitiveType = + new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BOOLEAN, "b"); + List<Integer> col1NumValTotSize = + new ArrayList<>(Arrays.asList(5, 8)); // (5, 8)// start column indexes + List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27)); + List<ColumnStat> testColumnStats = new ArrayList<>(); + String[] columnDotPath = {"a.b", "a.b"}; + for (int i = 0; i < columnDotPath.length; i++) { + testColumnStats.add( + ColumnStat.builder() + .field( + InternalField.builder() + .name(primitiveType.getName()) + .parentPath(null) + .schema(schemaExtractor.toInternalSchema(primitiveType, columnDotPath[i])) + .build()) + .numValues(col1NumValTotSize.get(i)) + .totalSize(col2NumValTotSize.get(i)) + .range(Range.vector(minStat, maxStat)) + .build()); + } + + return testColumnStats; + } + + public static List<ColumnStat> initStringFileTest(File file) throws IOException { + // create the parquet file by parsing a schema + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = + MessageTypeParser.parseMessageType( + "message m { required group a {required fixed_len_byte_array(10) b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + BinaryStatistics stats = new BinaryStatistics(); + stats.updateStats(Binary.fromString("1")); + stats.updateStats(Binary.fromString("2")); + stats.updateStats(Binary.fromString("5")); + + byte[] bytes1 = "First string".getBytes(); + byte[] bytes2 = "Second string".getBytes(); + + // write the string columned file + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 8, codec); + + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + + // reconstruct the stats for the InternalDataFile testing object + BinaryStatistics stats_clone = new BinaryStatistics(); + Binary originalBinary1 = Binary.fromString("1"); + Binary originalBinary2 = Binary.fromString("2"); + Binary originalBinary5 = Binary.fromString("5"); + stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes())); + stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes())); + stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes())); + + Binary minStat = stats_clone.genericGetMin(); + + Binary maxStat = stats_clone.genericGetMax(); + PrimitiveType primitiveType = + new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, "b"); + List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 8)); + List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(71, 36)); + List<ColumnStat> testColumnStats = new ArrayList<>(); + String[] columnDotPath = {"a.b", "a.b"}; + for (int i = 0; i < columnDotPath.length; i++) { + testColumnStats.add( + ColumnStat.builder() + .field( + InternalField.builder() + .name(primitiveType.getName()) + .parentPath(null) + .schema(schemaExtractor.toInternalSchema(primitiveType, columnDotPath[i])) + .build()) + .numValues(col1NumValTotSize.get(i)) + .totalSize(col2NumValTotSize.get(i)) + .range(Range.vector(minStat, maxStat)) + .build()); + } + + return testColumnStats; + } + + public static List<ColumnStat> initBinaryFileTest(File file) throws IOException { + // create the parquet file by parsing a schema + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = + MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = {0, 1, 2, 3}; + byte[] bytes2 = {2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + BinaryStatistics stats = new BinaryStatistics(); + stats.updateStats(Binary.fromString("1")); + stats.updateStats(Binary.fromString("2")); + stats.updateStats(Binary.fromString("5")); + + // to simplify the test we keep the same stats for both columns + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + + w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + + w.endColumn(); + w.endBlock(); + w.startBlock(4); + + w.startColumn(c1, 1, codec); + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + + // reconstruct the stats for the InternalDataFile testing object + BinaryStatistics stats_clone = new BinaryStatistics(); + Binary originalBinary1 = Binary.fromString("1"); + Binary originalBinary2 = Binary.fromString("2"); + Binary originalBinary5 = Binary.fromString("5"); + stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes())); + stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes())); + stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes())); + + Binary minStat = stats_clone.genericGetMin(); + Binary maxStat = stats_clone.genericGetMax(); + PrimitiveType primitiveType = + new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "b"); + List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 1)); + List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27)); + List<ColumnStat> testColumnStats = new ArrayList<>(); + String[] columnDotPath = {"a.b", "a.b"}; + for (int i = 0; i < columnDotPath.length; i++) { + testColumnStats.add( + ColumnStat.builder() + .field( + InternalField.builder() + .name(primitiveType.getName()) + .parentPath(null) + .schema(schemaExtractor.toInternalSchema(primitiveType, columnDotPath[i])) + .build()) + .numValues(col1NumValTotSize.get(i)) + .totalSize(col2NumValTotSize.get(i)) + .range(Range.vector(minStat, maxStat)) + .build()); + } + + return testColumnStats; + } + + public static List<ColumnStat> initIntFileTest(File file) throws IOException { + // create the parquet file by parsing a schema + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = + MessageTypeParser.parseMessageType("message m { required group a {required int32 b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + IntStatistics stats = new IntStatistics(); + stats.updateStats(1); + stats.updateStats(2); + stats.updateStats(5); + + // to simplify the test we keep the same stats for both columns + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + + w.startColumn(c1, 2, codec); + + w.writeDataPage(3, 3, BytesInput.fromInt(3), stats, BIT_PACKED, BIT_PACKED, PLAIN); + + w.writeDataPage(3, 3, BytesInput.fromInt(2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + + w.startColumn(c1, 1, codec); + + w.writeDataPage(3, 3, BytesInput.fromInt(1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + + // reconstruct the stats for the InternalDataFile testing object + + java.lang.Integer minStat = stats.genericGetMin(); + java.lang.Integer maxStat = stats.genericGetMax(); + PrimitiveType primitiveType = + new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.INT32, "b"); + List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(2, 1)); + List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27)); + List<ColumnStat> testColumnStats = new ArrayList<>(); + String[] columnDotPath = {"a.b", "a.b"}; + for (int i = 0; i < columnDotPath.length; i++) { + testColumnStats.add( + ColumnStat.builder() + .field( + InternalField.builder() + .name(primitiveType.getName()) + .parentPath(null) + .schema(schemaExtractor.toInternalSchema(primitiveType, columnDotPath[i])) + .build()) + .numValues(col1NumValTotSize.get(i)) + .totalSize(col2NumValTotSize.get(i)) + .range(Range.vector(minStat, maxStat)) + .build()); + } + + return testColumnStats; + } + + @Test + public void testInternalDataFileStringStat() throws IOException { + + Configuration configuration = new Configuration(); + + java.nio.file.Path path = tempDir.resolve("parquet-test-string-file"); + File file = path.toFile(); + file.deleteOnExit(); + List<ColumnStat> testColumnStats = initStringFileTest(file); + Path hadoopPath = new Path(file.toURI()); + // statsExtractor toInternalDataFile testing + InternalDataFile internalDataFile = + ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath); + InternalDataFile testInternalFile = + InternalDataFile.builder() + .physicalPath( + "file:" + .concat( + file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/"))) + .columnStats(testColumnStats) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(file.lastModified()) + .fileSizeBytes(file.length()) + .recordCount(8) + .build(); + + Assertions.assertEquals(testInternalFile, internalDataFile); + } + + @Test + public void testInternalDataFileBinaryStat() throws IOException { + + Configuration configuration = new Configuration(); + + java.nio.file.Path path = tempDir.resolve("parquet-test-binary-file"); + File file = path.toFile(); + file.deleteOnExit(); + List<ColumnStat> testColumnStats = initBinaryFileTest(file); + Path hadoopPath = new Path(file.toURI()); + // statsExtractor toInternalDataFile testing + InternalDataFile internalDataFile = + ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath); + InternalDataFile testInternalFile = + InternalDataFile.builder() + .physicalPath( + "file:" + .concat( + file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/"))) + .columnStats(testColumnStats) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(file.lastModified()) + .fileSizeBytes(file.length()) + .recordCount(5) + .build(); + + Assertions.assertEquals(testInternalFile, internalDataFile); + } + + @Test + public void testInternalDataFileIntStat() throws IOException { + + Configuration configuration = new Configuration(); + java.nio.file.Path path = tempDir.resolve("parquet-test-int-file"); + File file = path.toFile(); + file.deleteOnExit(); + List<ColumnStat> testColumnStats = initIntFileTest(file); + Path hadoopPath = new Path(file.toURI()); + // statsExtractor toInternalDataFile testing + InternalDataFile internalDataFile = + ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath); + InternalDataFile testInternalFile = + InternalDataFile.builder() + .physicalPath( + "file:" + .concat( + file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/"))) + .columnStats(testColumnStats) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(file.lastModified()) + .fileSizeBytes(file.length()) + .recordCount(2) + .build(); + + Assertions.assertEquals(testInternalFile, internalDataFile); + } + + @Test + public void testInternalDataFileBooleanStat() throws IOException { + Configuration configuration = new Configuration(); + + java.nio.file.Path path = tempDir.resolve("parquet-test-boolean-file"); + File file = path.toFile(); + file.deleteOnExit(); + + List<ColumnStat> testColumnStats = initBooleanFileTest(file); + Path hadoopPath = new Path(file.toURI()); + // statsExtractor toInternalDataFile testing + InternalDataFile internalDataFile = + ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath); + InternalDataFile testInternalFile = + InternalDataFile.builder() + .physicalPath( + "file:" + .concat( + file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/"))) + .columnStats(testColumnStats) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(file.lastModified()) + .fileSizeBytes(file.length()) + .recordCount(8) + .build(); + + Assertions.assertEquals(testInternalFile, internalDataFile); + } +} diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index 43549d1b..e04d6985 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -297,11 +297,13 @@ public class RunCatalogSync { * necessary connection and access details for describing and listing tables */ ExternalCatalogConfig sourceCatalog; + /** * Defines configuration one or more target catalogs, to which XTable will write or update * tables. Unlike the source, these catalogs must be writable */ List<ExternalCatalogConfig> targetCatalogs; + /** A list of datasets that specify how a source table maps to one or more target tables. */ List<Dataset> datasets; @@ -314,6 +316,7 @@ public class RunCatalogSync { public static class Dataset { /** Identifies the source table in sourceCatalog. */ SourceTableIdentifier sourceCatalogTableIdentifier; + /** A list of one or more targets that this source table should be written to. */ List<TargetTableIdentifier> targetCatalogTableIdentifiers; } @@ -324,6 +327,7 @@ public class RunCatalogSync { public static class SourceTableIdentifier { /** Specifies the table identifier in the source catalog. */ TableIdentifier tableIdentifier; + /** * (Optional) Provides direct storage details such as a table’s base path (like an S3 * location) and the partition specification. This allows reading from a source even if it is @@ -341,11 +345,13 @@ public class RunCatalogSync { * updated */ String catalogId; + /** * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be * stored at the target. */ String tableFormat; + /** Specifies the table identifier in the target catalog. */ TableIdentifier tableIdentifier; } @@ -359,6 +365,7 @@ public class RunCatalogSync { * HierarchicalTableIdentifier} */ String hierarchicalId; + /** Specifies the partition spec of the table */ String partitionSpec; }