the-other-tim-brown commented on code in PR #669: URL: https://github.com/apache/incubator-xtable/pull/669#discussion_r2040718744
########## xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java: ########## @@ -52,4 +52,20 @@ public class InternalDataFile extends InternalFile { @Builder.Default @NonNull List<ColumnStat> columnStats = Collections.emptyList(); // last modified time in millis since epoch long lastModified; + public static InternalDataFileBuilder builderFrom(InternalDataFile dataFile) { + return dataFile.toBuilder(); + } + + public static boolean compareFiles(InternalDataFile obj1, InternalDataFile obj2) { Review Comment: This looks like it is just for testing, let's move it to a test class if necessary. Why can't you use `equals` though? ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean isNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; Review Comment: Simplify to `schema.getRepetition() != Repetition.REQUIRED` ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.xtable.model.schema.InternalSchema; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.TreeSet; +import java.util.Optional; + +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import lombok.Builder; +import org.apache.xtable.model.schema.InternalField; +import lombok.Value; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.hadoop.conf.Configuration; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor(); + @Builder.Default Review Comment: If you want to use a Builder then these variables cannot be static ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean isNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @return a converted schema + */ + public InternalSchema toInternalSchema( + Type schema, String parentPath) { + InternalType newDataType = null; + Type.Repetition currentRepetition = null; + List<InternalField> subFields = new ArrayList<>(); + PrimitiveType primitiveType; + LogicalTypeAnnotation logicalType; + Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>(); + String elementName = schema.getName(); + if (schema.isPrimitive()) { + primitiveType = schema.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + // PrimitiveTypes + case INT64: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) { + newDataType = InternalType.TIMESTAMP; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MICROS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.TIMESTAMP_NTZ; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MILLIS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + newDataType = InternalType.TIMESTAMP_NTZ; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.NANOS); + } + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + newDataType = InternalType.INT; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + newDataType = InternalType.INT; + } + } else if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.LONG; + } + break; + case INT32: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) { + newDataType = InternalType.DATE; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.INT; + } + } else if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.INT; + } + break; + case FLOAT: + logicalType = schema.getLogicalTypeAnnotation(); Review Comment: Remove this unused variable ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean isNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @return a converted schema + */ + public InternalSchema toInternalSchema( + Type schema, String parentPath) { + InternalType newDataType = null; + Type.Repetition currentRepetition = null; + List<InternalField> subFields = new ArrayList<>(); + PrimitiveType primitiveType; + LogicalTypeAnnotation logicalType; + Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>(); Review Comment: You can use `new EnumMap<>(InternalSchema.MetadataKey.class)` to be a bit more memory efficient ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.xtable.model.schema.InternalSchema; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.TreeSet; +import java.util.Optional; + +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import lombok.Builder; +import org.apache.xtable.model.schema.InternalField; +import lombok.Value; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.hadoop.conf.Configuration; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor(); + @Builder.Default + private static final ParquetPartitionValueExtractor partitionExtractor = + ParquetPartitionValueExtractor.getInstance(); + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private static final InputPartitionFields partitions = null; + + public static ParquetStatsExtractor getInstance() { + return INSTANCE; + } + + public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata footer) { + return getStatsForaFile(footer).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) { + return columnStats.stream() + .filter(entry -> entry.getField().getParentPath() == null) + .map(ColumnStat::getNumValues) + .filter(numValues -> numValues > 0) + .max(Long::compareTo); + } + + + public static Map<ColumnDescriptor, List<ColumnStat>> getStatsForaFile(ParquetMetadata footer) { + List<ColumnStat> colStat = new ArrayList<ColumnStat>(); Review Comment: Remove this unused list ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean isNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @return a converted schema + */ + public InternalSchema toInternalSchema( + Type schema, String parentPath) { + InternalType newDataType = null; + Type.Repetition currentRepetition = null; + List<InternalField> subFields = new ArrayList<>(); + PrimitiveType primitiveType; + LogicalTypeAnnotation logicalType; + Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>(); + String elementName = schema.getName(); + if (schema.isPrimitive()) { + primitiveType = schema.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + // PrimitiveTypes + case INT64: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) { + newDataType = InternalType.TIMESTAMP; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MICROS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.TIMESTAMP_NTZ; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MILLIS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + newDataType = InternalType.TIMESTAMP_NTZ; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.NANOS); + } + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + newDataType = InternalType.INT; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + newDataType = InternalType.INT; + } + } else if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.LONG; + } + break; + case INT32: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) { + newDataType = InternalType.DATE; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.INT; + } + } else if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.INT; + } + break; + case FLOAT: + logicalType = schema.getLogicalTypeAnnotation(); + newDataType = InternalType.FLOAT; + break; + case FIXED_LEN_BYTE_ARRAY: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) { + newDataType = InternalType.UUID; + } else if (logicalType instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) { + metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 12); + newDataType = InternalType.FIXED; + } else if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } + break; + case BINARY: + // TODO Variant,GEOMETRY, GEOGRAPHY, + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.ENUM_VALUES, logicalType.toOriginalType().values()); + newDataType = InternalType.ENUM; + } else if (logicalType instanceof LogicalTypeAnnotation.JsonLogicalTypeAnnotation) { + newDataType = InternalType.BYTES; + } else if (logicalType instanceof LogicalTypeAnnotation.BsonLogicalTypeAnnotation) { + newDataType = InternalType.BYTES; + } else if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + newDataType = InternalType.STRING; + } else if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + + } else { + newDataType = InternalType.BYTES; + } + break; + case BOOLEAN: + newDataType = InternalType.BOOLEAN; + break; + /*case UNKNOWN: + newDataType = InternalType.NULL; + break;*/ + default: + /*if (logicalType instanceof LogicalTypeAnnotation.UnknownLogicalTypeAnnotation){ + newDataType = InternalType.NULL; + } + else {*/ Review Comment: Remove the commented out code in here ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.xtable.model.schema.InternalSchema; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.TreeSet; +import java.util.Optional; + +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import lombok.Builder; +import org.apache.xtable.model.schema.InternalField; +import lombok.Value; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.hadoop.conf.Configuration; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor(); + @Builder.Default + private static final ParquetPartitionValueExtractor partitionExtractor = + ParquetPartitionValueExtractor.getInstance(); + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private static final InputPartitionFields partitions = null; + + public static ParquetStatsExtractor getInstance() { + return INSTANCE; + } + + public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata footer) { + return getStatsForaFile(footer).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) { + return columnStats.stream() + .filter(entry -> entry.getField().getParentPath() == null) + .map(ColumnStat::getNumValues) + .filter(numValues -> numValues > 0) + .max(Long::compareTo); + } + + + public static Map<ColumnDescriptor, List<ColumnStat>> getStatsForaFile(ParquetMetadata footer) { + List<ColumnStat> colStat = new ArrayList<ColumnStat>(); + Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>(); + MessageType schema = parquetMetadataExtractor.getSchema(footer); + List<ColumnChunkMetaData> columns = new ArrayList<>(); + for (BlockMetaData blockMetaData: footer.getBlocks()){ + columns.addAll(blockMetaData.getColumns()); + } + columnDescStats = + columns + .stream() Review Comment: You can use the streams API and chain these together like: ``` footer.getBlocks().stream().flatMap(blockMetaData -> blockMetaData.getColumns().stream()) ``` ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean isNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @return a converted schema + */ + public InternalSchema toInternalSchema( + Type schema, String parentPath) { + InternalType newDataType = null; + Type.Repetition currentRepetition = null; + List<InternalField> subFields = new ArrayList<>(); Review Comment: Can you default this to `null` instead and only initialize the list when it is required? ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.parquet.hadoop.ParquetFileReader; + +import java.util.Collections; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.xtable.model.stat.Range; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.xtable.model.storage.FileFormat; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.io.IOException; + +import lombok.Builder; +import org.apache.parquet.schema.MessageTypeParser; + + + +import org.apache.xtable.model.stat.Range; + + +public class TestParquetStatsExtractor { + + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + public static ParquetFileReader createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = {0, 1, 2, 3}; + byte[] bytes2 = {2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + // include statics using update() + IntStatistics stats = new IntStatistics(); // or BinaryStatistics + stats.updateStats(1); + stats.updateStats(2); + stats.updateStats(5); + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); Review Comment: Make sure to use try-with-resources to properly close the writer ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + + // Assertions.assertEquals( + // decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); Review Comment: Should these be added back? ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.parquet.hadoop.ParquetFileReader; + +import java.util.Collections; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.xtable.model.stat.Range; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.xtable.model.storage.FileFormat; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.io.IOException; + +import lombok.Builder; +import org.apache.parquet.schema.MessageTypeParser; + + + +import org.apache.xtable.model.stat.Range; + + +public class TestParquetStatsExtractor { + + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + public static ParquetFileReader createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = {0, 1, 2, 3}; + byte[] bytes2 = {2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + // include statics using update() + IntStatistics stats = new IntStatistics(); // or BinaryStatistics + stats.updateStats(1); + stats.updateStats(2); + stats.updateStats(5); + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 8, codec); + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + return new ParquetFileReader(configuration, path, w.getFooter()); + } + + @Test + public void testToInternalDataFile() { + File file = null; + ParquetFileReader fileReader = null; + InternalDataFile internalDataFile = null; + Configuration configuration = new Configuration(); + + try { + file = new File("./", "test.parquet"); + fileReader = createParquetFile(file); + //statsExtractor toInternalDataFile testing + internalDataFile = ParquetStatsExtractor.toInternalDataFile(configuration, fileReader.getPath()); + } catch (IOException e) { + System.out.println(e); + } + // TODO check if the stats are ok to get as follows + List<ColumnStat> testColumnStats = new ArrayList<>(); + for (BlockMetaData blockMetaData : fileReader.getFooter().getBlocks()) { + List<ColumnChunkMetaData> columns = blockMetaData.getColumns(); + for (ColumnChunkMetaData columnMetaData : columns) { + testColumnStats.add(ColumnStat.builder() + .field(InternalField.builder() + .name(columnMetaData.getPrimitiveType().getName()) + .parentPath(null) + .schema(schemaExtractor.toInternalSchema(columnMetaData.getPrimitiveType(), columnMetaData.getPath().toDotString())) + .build()) + .numValues(columnMetaData.getValueCount()) + .totalSize(columnMetaData.getTotalSize()) + .range(Range.vector(columnMetaData.getStatistics().genericGetMin(), columnMetaData.getStatistics().genericGetMax())) + .build()); + } + } + + InternalDataFile testInternalFile = + InternalDataFile.builder() + .physicalPath("file:/C:/Users/slims/Downloads/XTable/incubator-xtable/xtable-core/test.parquet")//TODO hard coded path to file method + .columnStats(testColumnStats) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(file.lastModified()) + .fileSizeBytes(file.length()) + .recordCount(8)// TODO remove 8 and replace with record count from file + .build(); + + Assertions.assertEquals( + true, InternalDataFile.compareFiles(testInternalFile,internalDataFile)); + } + + @Test + public void main() { + testToInternalDataFile(); + } Review Comment: You don't need this since `testToInternalDataFile` is already annotated with `@Test` ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.parquet.hadoop.ParquetFileReader; + +import java.util.Collections; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.xtable.model.stat.Range; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.xtable.model.storage.FileFormat; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.io.IOException; + +import lombok.Builder; +import org.apache.parquet.schema.MessageTypeParser; + + + +import org.apache.xtable.model.stat.Range; + + +public class TestParquetStatsExtractor { + + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + public static ParquetFileReader createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = {0, 1, 2, 3}; + byte[] bytes2 = {2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + // include statics using update() + IntStatistics stats = new IntStatistics(); // or BinaryStatistics + stats.updateStats(1); + stats.updateStats(2); + stats.updateStats(5); + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 8, codec); + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + return new ParquetFileReader(configuration, path, w.getFooter()); + } + + @Test + public void testToInternalDataFile() { + File file = null; + ParquetFileReader fileReader = null; + InternalDataFile internalDataFile = null; + Configuration configuration = new Configuration(); + + try { + file = new File("./", "test.parquet"); + fileReader = createParquetFile(file); + //statsExtractor toInternalDataFile testing + internalDataFile = ParquetStatsExtractor.toInternalDataFile(configuration, fileReader.getPath()); + } catch (IOException e) { + System.out.println(e); + } + // TODO check if the stats are ok to get as follows + List<ColumnStat> testColumnStats = new ArrayList<>(); + for (BlockMetaData blockMetaData : fileReader.getFooter().getBlocks()) { + List<ColumnChunkMetaData> columns = blockMetaData.getColumns(); + for (ColumnChunkMetaData columnMetaData : columns) { + testColumnStats.add(ColumnStat.builder() Review Comment: Right now this is just mirroring the implementation. You can just build the expected output manually to ensure that the implementation is matching your expectations. ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.parquet.hadoop.ParquetFileReader; + +import java.util.Collections; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.xtable.model.stat.Range; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.xtable.model.storage.FileFormat; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.io.IOException; + +import lombok.Builder; +import org.apache.parquet.schema.MessageTypeParser; + + + +import org.apache.xtable.model.stat.Range; + + +public class TestParquetStatsExtractor { + + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + public static ParquetFileReader createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = {0, 1, 2, 3}; + byte[] bytes2 = {2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + // include statics using update() + IntStatistics stats = new IntStatistics(); // or BinaryStatistics + stats.updateStats(1); + stats.updateStats(2); + stats.updateStats(5); + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 8, codec); + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + return new ParquetFileReader(configuration, path, w.getFooter()); + } + + @Test + public void testToInternalDataFile() { + File file = null; + ParquetFileReader fileReader = null; + InternalDataFile internalDataFile = null; + Configuration configuration = new Configuration(); + + try { + file = new File("./", "test.parquet"); Review Comment: You can use a TempDir to create a file in a temporary directory and the test framework will clean it up for you. Check out this [file](https://github.com/apache/incubator-xtable/blob/main/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java#L83) for reference ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.parquet.hadoop.ParquetFileReader; + +import java.util.Collections; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.xtable.model.stat.Range; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.xtable.model.storage.FileFormat; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.io.IOException; + +import lombok.Builder; +import org.apache.parquet.schema.MessageTypeParser; + + + +import org.apache.xtable.model.stat.Range; + + +public class TestParquetStatsExtractor { + + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + public static ParquetFileReader createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); Review Comment: We'll need tests for handling the stats of other types to make sure that they are translated properly ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + + // Assertions.assertEquals( + // decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); + + + + + // tests for timestamp and date + InternalSchema testDate = + InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build(); + + + // millisMetadata = new HashMap<>(); + Map<InternalSchema.MetadataKey, Object> millisMetadata= Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MILLIS); + Map<InternalSchema.MetadataKey, Object> microsMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MICROS); + Map<InternalSchema.MetadataKey, Object> nanosMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.NANOS); + + InternalSchema testTimestampMillis = + InternalSchema.builder().name("timestamp_millis").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(millisMetadata).build(); + + InternalSchema testTimestampMicros = + InternalSchema.builder().name("timestamp_micros").dataType(InternalType.TIMESTAMP).isNullable(false).metadata(microsMetadata).build(); + + InternalSchema testTimestampNanos = + InternalSchema.builder().name("timestamp_nanos").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(nanosMetadata).build(); + + Type timestampMillisPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp_millis"); + Type timestampNanosPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_nanos"); + //Assertions.assertEquals(testTimestampMillis, schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null)); + // Assertions.assertEquals(testTimestampNanos, schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null)); + + + // test date + + Type datePrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()) + .named("date"); + Assertions.assertEquals(testDate, schemaExtractor.toInternalSchema(datePrimitiveType, null)); + + } + + + @Test + public void testGrouptTypes() { Review Comment: ```suggestion public void testGroupTypes() { ``` ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + + // Assertions.assertEquals( + // decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); + + + + + // tests for timestamp and date + InternalSchema testDate = + InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build(); + + + // millisMetadata = new HashMap<>(); + Map<InternalSchema.MetadataKey, Object> millisMetadata= Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MILLIS); + Map<InternalSchema.MetadataKey, Object> microsMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MICROS); + Map<InternalSchema.MetadataKey, Object> nanosMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.NANOS); + + InternalSchema testTimestampMillis = + InternalSchema.builder().name("timestamp_millis").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(millisMetadata).build(); + + InternalSchema testTimestampMicros = + InternalSchema.builder().name("timestamp_micros").dataType(InternalType.TIMESTAMP).isNullable(false).metadata(microsMetadata).build(); + + InternalSchema testTimestampNanos = + InternalSchema.builder().name("timestamp_nanos").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(nanosMetadata).build(); + + Type timestampMillisPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp_millis"); + Type timestampNanosPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_nanos"); + //Assertions.assertEquals(testTimestampMillis, schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null)); Review Comment: same note here ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + + // Assertions.assertEquals( + // decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); + + + + + // tests for timestamp and date + InternalSchema testDate = + InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build(); + + + // millisMetadata = new HashMap<>(); + Map<InternalSchema.MetadataKey, Object> millisMetadata= Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MILLIS); + Map<InternalSchema.MetadataKey, Object> microsMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MICROS); + Map<InternalSchema.MetadataKey, Object> nanosMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.NANOS); + + InternalSchema testTimestampMillis = + InternalSchema.builder().name("timestamp_millis").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(millisMetadata).build(); + + InternalSchema testTimestampMicros = + InternalSchema.builder().name("timestamp_micros").dataType(InternalType.TIMESTAMP).isNullable(false).metadata(microsMetadata).build(); + + InternalSchema testTimestampNanos = + InternalSchema.builder().name("timestamp_nanos").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(nanosMetadata).build(); + + Type timestampMillisPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp_millis"); + Type timestampNanosPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_nanos"); + //Assertions.assertEquals(testTimestampMillis, schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null)); + // Assertions.assertEquals(testTimestampNanos, schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null)); + + + // test date + + Type datePrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()) + .named("date"); + Assertions.assertEquals(testDate, schemaExtractor.toInternalSchema(datePrimitiveType, null)); + + } + + + @Test + public void testGrouptTypes() { + + //map + + InternalSchema internalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + + + /* testing from fromInternalSchema()*/ + + GroupType fromSimpleList = Types.requiredList().element(Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)).named(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)).named("my_list"); + + InternalSchema fromInternalList = InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(null) + .schema( + InternalSchema.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + GroupType fromTestMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).as(LogicalTypeAnnotation.intType(32, false)).named("value")) + .named("map"); + InternalSchema fromInternalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key")//"key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("_one_field_value")//"value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("my_group") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("name") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(null) + .build())) + .dataType(InternalType.RECORD) + .build(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("my_record") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("my_list") + .schema( + InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_list") + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("my_group") + .schema(recordListElementSchema) + .defaultValue(null) + .build())) + .build(); + + // Assertions.assertEquals( + // fromTestMap, schemaExtractor.fromInternalSchema(fromInternalMap, null)); + // Assertions.assertEquals(fromSimpleList, schemaExtractor.fromInternalSchema(fromInternalList, null)); + + GroupType testGroupType = Types.requiredGroup() + .required(PrimitiveTypeName.INT64).named("id") + .optional(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("name") + //.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date") + .named("my_group"); + + + + + GroupType testMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("value")) + .named("map"); + GroupType listType = Types.requiredList().setElementType(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("element")).named("my_list"); + MessageType messageType = Types.buildMessage() + //.addField(testMap) + .addField(listType) + .addField(testGroupType) + .named("my_record"); + + // Assertions.assertEquals( + // internalMap, schemaExtractor.toInternalSchema(testMap, null)); + /* Assertions.assertEquals( + internalSchema, schemaExtractor.toInternalSchema(messageType, null));*/ Review Comment: Why is this commented out? The test will not make assertions without this ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + + // Assertions.assertEquals( + // decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); + + + + + // tests for timestamp and date + InternalSchema testDate = + InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build(); + + + // millisMetadata = new HashMap<>(); + Map<InternalSchema.MetadataKey, Object> millisMetadata= Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MILLIS); + Map<InternalSchema.MetadataKey, Object> microsMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.MICROS); + Map<InternalSchema.MetadataKey, Object> nanosMetadata = Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION,InternalSchema.MetadataValue.NANOS); + + InternalSchema testTimestampMillis = + InternalSchema.builder().name("timestamp_millis").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(millisMetadata).build(); + + InternalSchema testTimestampMicros = + InternalSchema.builder().name("timestamp_micros").dataType(InternalType.TIMESTAMP).isNullable(false).metadata(microsMetadata).build(); + + InternalSchema testTimestampNanos = + InternalSchema.builder().name("timestamp_nanos").dataType(InternalType.TIMESTAMP_NTZ).isNullable(false).metadata(nanosMetadata).build(); + + Type timestampMillisPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp_millis"); + Type timestampNanosPrimitiveType = Types + .required(PrimitiveTypeName.INT64).as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_nanos"); + //Assertions.assertEquals(testTimestampMillis, schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null)); + // Assertions.assertEquals(testTimestampNanos, schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null)); + + + // test date + + Type datePrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()) + .named("date"); + Assertions.assertEquals(testDate, schemaExtractor.toInternalSchema(datePrimitiveType, null)); + + } + + + @Test + public void testGrouptTypes() { + + //map + + InternalSchema internalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + + + /* testing from fromInternalSchema()*/ + + GroupType fromSimpleList = Types.requiredList().element(Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)).named(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)).named("my_list"); + + InternalSchema fromInternalList = InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(null) + .schema( + InternalSchema.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + GroupType fromTestMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).as(LogicalTypeAnnotation.intType(32, false)).named("value")) + .named("map"); + InternalSchema fromInternalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key")//"key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("_one_field_value")//"value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("my_group") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("name") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(null) + .build())) + .dataType(InternalType.RECORD) + .build(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("my_record") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("my_list") + .schema( + InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_list") + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("my_group") + .schema(recordListElementSchema) + .defaultValue(null) + .build())) + .build(); + + // Assertions.assertEquals( + // fromTestMap, schemaExtractor.fromInternalSchema(fromInternalMap, null)); + // Assertions.assertEquals(fromSimpleList, schemaExtractor.fromInternalSchema(fromInternalList, null)); + + GroupType testGroupType = Types.requiredGroup() + .required(PrimitiveTypeName.INT64).named("id") + .optional(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("name") + //.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date") + .named("my_group"); + + + + + GroupType testMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("value")) + .named("map"); + GroupType listType = Types.requiredList().setElementType(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("element")).named("my_list"); + MessageType messageType = Types.buildMessage() + //.addField(testMap) + .addField(listType) + .addField(testGroupType) + .named("my_record"); + + // Assertions.assertEquals( + // internalMap, schemaExtractor.toInternalSchema(testMap, null)); + /* Assertions.assertEquals( + internalSchema, schemaExtractor.toInternalSchema(messageType, null));*/ + } + + @Test + public void main() { + testPrimitiveTypes(); + testGrouptTypes(); + } Review Comment: This is not needed ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.xtable.model.schema.InternalSchema; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.TreeSet; +import java.util.Optional; + +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import lombok.Builder; +import org.apache.xtable.model.schema.InternalField; +import lombok.Value; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.hadoop.conf.Configuration; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor(); + @Builder.Default + private static final ParquetPartitionValueExtractor partitionExtractor = + ParquetPartitionValueExtractor.getInstance(); + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private static final InputPartitionFields partitions = null; + + public static ParquetStatsExtractor getInstance() { + return INSTANCE; + } + + public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata footer) { + return getStatsForaFile(footer).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) { + return columnStats.stream() + .filter(entry -> entry.getField().getParentPath() == null) + .map(ColumnStat::getNumValues) + .filter(numValues -> numValues > 0) + .max(Long::compareTo); + } + + + public static Map<ColumnDescriptor, List<ColumnStat>> getStatsForaFile(ParquetMetadata footer) { Review Comment: Let's update the name to be `getStatsForFile` The method can also take in a schema so you only have to extract a schema once for a set of files. Then you can use the `InternalField` objects within that schema instead of building them in this class. ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.xtable.model.schema.InternalSchema; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.TreeSet; +import java.util.Optional; + +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import lombok.Builder; +import org.apache.xtable.model.schema.InternalField; +import lombok.Value; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.hadoop.conf.Configuration; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor(); + @Builder.Default + private static final ParquetPartitionValueExtractor partitionExtractor = + ParquetPartitionValueExtractor.getInstance(); + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private static final InputPartitionFields partitions = null; + + public static ParquetStatsExtractor getInstance() { + return INSTANCE; + } + + public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata footer) { + return getStatsForaFile(footer).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + private static Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) { + return columnStats.stream() + .filter(entry -> entry.getField().getParentPath() == null) + .map(ColumnStat::getNumValues) + .filter(numValues -> numValues > 0) + .max(Long::compareTo); + } + + + public static Map<ColumnDescriptor, List<ColumnStat>> getStatsForaFile(ParquetMetadata footer) { + List<ColumnStat> colStat = new ArrayList<ColumnStat>(); + Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>(); + MessageType schema = parquetMetadataExtractor.getSchema(footer); + List<ColumnChunkMetaData> columns = new ArrayList<>(); + for (BlockMetaData blockMetaData: footer.getBlocks()){ + columns.addAll(blockMetaData.getColumns()); + } + columnDescStats = + columns + .stream() + .collect(Collectors.groupingBy(columnMetaData -> schema.getColumnDescription(columnMetaData.getPath().toArray()), + Collectors.mapping(columnMetaData -> ColumnStat.builder() + .field(InternalField.builder() + .name(columnMetaData.getPrimitiveType().getName()) + .fieldId(columnMetaData.getPrimitiveType().getId() == null ? null : columnMetaData.getPrimitiveType().getId().intValue()) + .parentPath(null) + .schema(schemaExtractor.toInternalSchema(columnMetaData.getPrimitiveType(), columnMetaData.getPath().toDotString())) + .build()) + .numValues(columnMetaData.getValueCount()) + .totalSize(columnMetaData.getTotalSize()) + .range(Range.vector(columnMetaData.getStatistics().getMinBytes()[0], columnMetaData.getStatistics().getMaxBytes()[0]))// TODO convert byte array into numerical representation Review Comment: Just using the first byte does not seem valid ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.parquet.hadoop.ParquetFileReader; + +import java.util.Collections; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.xtable.model.stat.Range; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.xtable.model.storage.FileFormat; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; + +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.io.IOException; + +import lombok.Builder; +import org.apache.parquet.schema.MessageTypeParser; + + + +import org.apache.xtable.model.stat.Range; + + +public class TestParquetStatsExtractor { + + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + public static ParquetFileReader createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = {0, 1, 2, 3}; + byte[] bytes2 = {2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + // include statics using update() + IntStatistics stats = new IntStatistics(); // or BinaryStatistics + stats.updateStats(1); + stats.updateStats(2); + stats.updateStats(5); + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 8, codec); + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + return new ParquetFileReader(configuration, path, w.getFooter()); + } + + @Test + public void testToInternalDataFile() { + File file = null; + ParquetFileReader fileReader = null; + InternalDataFile internalDataFile = null; + Configuration configuration = new Configuration(); + + try { + file = new File("./", "test.parquet"); + fileReader = createParquetFile(file); + //statsExtractor toInternalDataFile testing + internalDataFile = ParquetStatsExtractor.toInternalDataFile(configuration, fileReader.getPath()); + } catch (IOException e) { + System.out.println(e); Review Comment: Instead of printing, you can just declares the method `throws IOException` ########## xtable-core/pom.xml: ########## @@ -61,6 +61,12 @@ <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro --> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>1.15.0</version> Review Comment: @unical1988 please address this and also upgrade to 1.15.1 while you are at it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org