the-other-tim-brown commented on code in PR #669: URL: https://github.com/apache/incubator-xtable/pull/669#discussion_r2035449171
########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { Review Comment: Can you split this into two tests? one for the primitive types and one for the list/maps/records ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean groupTypeIsNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + private static boolean groupTypeContainsNull(Type schema) { + if (!schema.isPrimitive()) { + for (Type field : schema.asGroupType().getFields()) { + if (field/*.getLogicalTypeAnnotation().toOriginalType()*/ == null) { + return true; + } + } + } else { + if (schema.equals(null)) { + return true; + } + } + + return false; + } + + /* private static LogicalTypeAnnotation finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) { + if (inputSchema.isNullable()) { + return targetSchema.union(null); // LogicalTypeAnnotation.unknownType() + } + return targetSchema; + }*/ + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @return a converted schema + */ + public InternalSchema toInternalSchema( + Type schema, String parentPath) { + // TODO - Does not handle recursion in parquet schema + InternalType newDataType = null; + Type.Repetition currentRepetition = null; + List<InternalField> subFields = new ArrayList<>(); + PrimitiveType primitiveType; + LogicalTypeAnnotation logicalType; + Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>(); + String elementName = schema.getName(); + if (schema.isPrimitive()) { + primitiveType = schema.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + // PrimitiveTypes + case INT64: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) { + newDataType = InternalType.TIMESTAMP; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MICROS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.TIMESTAMP_NTZ; Review Comment: the unit being millis does not imply that this is not adjusted to UTC. It looks like there is a `isAdjustedToUTC()` method you can use similar to the `getUnit()` method to determine this ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean groupTypeIsNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + private static boolean groupTypeContainsNull(Type schema) { Review Comment: This is unused, let's remove it ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + InternalSchema simpleList = InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(null) + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + + //map + + InternalSchema internalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + InternalSchema group1 = + InternalSchema.builder().name("list").dataType(InternalType.LIST).build(); + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("my_group") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("name") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(null) + .build())) + .dataType(InternalType.RECORD) + .build(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("my_record") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("my_list") + .schema( + InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_list") + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("my_group") + .schema(recordListElementSchema) + /*InternalSchema.builder() + .name("array") + .isNullable(true) + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_group") + .schema(recordListElementSchema) + .build())) + .build())*/ + .defaultValue(null) + .build())) + .build(); + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + GroupType testGroupType = Types.requiredGroup() + .required(PrimitiveTypeName.INT64).named("id") + .optional(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("name") + //.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date") + .named("my_group"); + + /* GroupType nestedGroupType = Types.requiredGroup() + .required(INT64).named("id") + .optional(BINARY).as(UTF8).named("email") + .optionalGroup() + .required(BINARY).as(UTF8).named("street") + .required(INT32).named("zipcode") + .named("address") + .named("User");*/ + + GroupType testMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("value")) + .named("map"); + GroupType listType = Types.requiredList().setElementType(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("element")).named("my_list"); + MessageType messageType = Types.buildMessage() + //.addField(testMap) + .addField(listType) + .addField(testGroupType) + .named("my_record"); + + /*GroupType nestedList = Types.requiredList() Review Comment: Can this be cleaned up? ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean groupTypeIsNullable(Type schema) { + return schema.getRepetition() == Repetition.REQUIRED ? false : true; + } + + private static boolean groupTypeContainsNull(Type schema) { + if (!schema.isPrimitive()) { + for (Type field : schema.asGroupType().getFields()) { + if (field/*.getLogicalTypeAnnotation().toOriginalType()*/ == null) { + return true; + } + } + } else { + if (schema.equals(null)) { + return true; + } + } + + return false; + } + + /* private static LogicalTypeAnnotation finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) { Review Comment: Can you clean up this commented out code? ########## xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; +import org.apache.xtable.exception.SchemaExtractorException; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.parquet.column.ColumnDescriptor; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean groupTypeIsNullable(Type schema) { Review Comment: I don't think this is specific to the group type, let's update the name to just `isNullable`? ########## xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.GroupType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.parquet.schema.OriginalType; + +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; + + +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + + InternalSchema simpleList = InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(null) + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + + //map + + InternalSchema internalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + + InternalSchema primitive1 = + InternalSchema.builder().name("integer").dataType(InternalType.INT).build(); + InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING).build(); + InternalSchema group1 = + InternalSchema.builder().name("list").dataType(InternalType.LIST).build(); + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("my_group") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .parentPath("my_group") + .schema( + InternalSchema.builder() + .name("name") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(null) + .build())) + .dataType(InternalType.RECORD) + .build(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("my_record") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("my_list") + .schema( + InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_list") + .schema( + InternalSchema.builder() + .name("element") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("my_group") + .schema(recordListElementSchema) + /*InternalSchema.builder() + .name("array") + .isNullable(true) + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("my_group") + .schema(recordListElementSchema) + .build())) + .build())*/ + .defaultValue(null) + .build())) + .build(); + + + Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new HashMap<>(); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6); + fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5); + InternalSchema decimalType = + InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).isNullable(false) + .metadata(fixedDecimalMetadata).build(); + + Type stringPrimitiveType = Types + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())//.named("string") + .named("string"); + + Type intPrimitiveType = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)) + .named("integer"); + + Type decimalPrimitive = Types + .required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.decimalType(5, 6)) + .named("decimal"); + + /* Assertions.assertEquals( + primitive1, schemaExtractor.toInternalSchema(intPrimitiveType, null)); + + Assertions.assertEquals( + primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType, null));*/ + + GroupType testGroupType = Types.requiredGroup() + .required(PrimitiveTypeName.INT64).named("id") + .optional(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("name") + //.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date") + .named("my_group"); + + /* GroupType nestedGroupType = Types.requiredGroup() + .required(INT64).named("id") + .optional(BINARY).as(UTF8).named("email") + .optionalGroup() + .required(BINARY).as(UTF8).named("street") + .required(INT32).named("zipcode") + .named("address") + .named("User");*/ + + GroupType testMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("value")) + .named("map"); + GroupType listType = Types.requiredList().setElementType(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).named("element")).named("my_list"); + MessageType messageType = Types.buildMessage() + //.addField(testMap) + .addField(listType) + .addField(testGroupType) + .named("my_record"); + + /*GroupType nestedList = Types.requiredList() + .optionalList() + .requiredElement(PrimitiveTypeNameINT32).named("integer") + .named("nestedListInner1") + .optionalList() + .requiredElement(PrimitiveTypeNameINT32).named("integer") + .named("nestedListInner2") + .named("nestedListOuter");*/ + + + // Assertions.assertEquals( + // decimalType, schemaExtractor.toInternalSchema(decimalPrimitive, null)); + + // Assertions.assertEquals( + // internalMap, schemaExtractor.toInternalSchema(testMap, null)); + /* Assertions.assertEquals( + internalSchema, schemaExtractor.toInternalSchema(messageType, null));*/ + + + + + /* testing from fromInternalSchema()*/ + + GroupType fromSimpleList = Types.requiredList().element(Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.intType(32, false)).named(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)).named("my_list"); + + InternalSchema fromInternalList = InternalSchema.builder() + .name("my_list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(null) + .schema( + InternalSchema.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + GroupType fromTestMap = Types.requiredMap() + .key(Types.primitive(PrimitiveTypeName.FLOAT, Repetition.REQUIRED).named("key")) + .value(Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED).as(LogicalTypeAnnotation.intType(32, false)).named("value")) + .named("map"); + InternalSchema fromInternalMap = InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name("_one_field_key")//"key") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("key") + .dataType(InternalType.FLOAT) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("_one_field_value")//"value") + .parentPath("_one_field_value") + .schema( + InternalSchema.builder() + .name("value") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build(); + + // Assertions.assertEquals( + // fromTestMap, schemaExtractor.fromInternalSchema(fromInternalMap, null)); + // Assertions.assertEquals(fromSimpleList, schemaExtractor.fromInternalSchema(fromInternalList, null)); + + + // tests for timestamp and date + InternalSchema testDate = + InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build(); + + + Map<InternalSchema.MetadataKey, Object> millisMetadata = new HashMap<>(); + millisMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); Review Comment: Just a tip, you can use `Collections.singletonMap(key, value)` when setting up these maps with a single value to avoid some of the java verbosity -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org