the-other-tim-brown commented on code in PR #14355: URL: https://github.com/apache/hudi/pull/14355#discussion_r2579424366
########## hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java: ########## @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link HoodieSchemaConverter}. + */ +public class TestHoodieSchemaConverter { + + @Test + public void testPrimitiveTypes() { + // String + HoodieSchema stringSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.STRING, stringSchema.getType()); + + // Int + HoodieSchema intSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.INT, intSchema.getType()); + + // Long + HoodieSchema longSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BIGINT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.LONG, longSchema.getType()); + + // Float + HoodieSchema floatSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.FLOAT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.FLOAT, floatSchema.getType()); + + // Double + HoodieSchema doubleSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DOUBLE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DOUBLE, doubleSchema.getType()); + + // Boolean + HoodieSchema boolSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BOOLEAN().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BOOLEAN, boolSchema.getType()); + + // Bytes + HoodieSchema bytesSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BYTES().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BYTES, bytesSchema.getType()); + } + + @Test + public void testNullableTypes() { + HoodieSchema nullableString = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableString.getType()); + assertTrue(nullableString.isNullable()); + + HoodieSchema nullableInt = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableInt.getType()); + assertTrue(nullableInt.isNullable()); + } + + @Test + public void testTemporalTypes() { + // Date + HoodieSchema dateSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DATE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DATE, dateSchema.getType()); + + // Time + HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIME(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIME, timeSchema.getType()); Review Comment: Check the time precision here as well and add a test for Time-Micros ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java: ########## @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.ReflectionUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Converts Flink's LogicalType into HoodieSchema. + */ +public class HoodieSchemaConverter { + + /** + * Converts a Flink LogicalType into a HoodieSchema. + * + * <p>Uses "record" as the default type name for record types. + * + * @param logicalType Flink logical type definition + * @return HoodieSchema matching the logical type + */ + public static HoodieSchema convertToSchema(LogicalType logicalType) { + return convertToSchema(logicalType, "record"); + } + + /** + * Converts a Flink LogicalType into a HoodieSchema with specified record name. + * + * <p>The "{rowName}." is used as the nested row type name prefix in order to generate + * the right schema. Nested record types that only differ by type name are still compatible. + * + * @param logicalType Flink logical type + * @param rowName the record name + * @return HoodieSchema matching this logical type + */ + public static HoodieSchema convertToSchema(LogicalType logicalType, String rowName) { + int precision; + boolean nullable = logicalType.isNullable(); + + switch (logicalType.getTypeRoot()) { + case NULL: + return HoodieSchema.create(HoodieSchemaType.NULL); + + case BOOLEAN: + HoodieSchema bool = HoodieSchema.create(HoodieSchemaType.BOOLEAN); + return nullable ? nullableSchema(bool) : bool; + + case TINYINT: + case SMALLINT: + case INTEGER: + HoodieSchema integer = HoodieSchema.create(HoodieSchemaType.INT); + return nullable ? nullableSchema(integer) : integer; + + case BIGINT: + HoodieSchema bigint = HoodieSchema.create(HoodieSchemaType.LONG); + return nullable ? nullableSchema(bigint) : bigint; + + case FLOAT: + HoodieSchema f = HoodieSchema.create(HoodieSchemaType.FLOAT); + return nullable ? nullableSchema(f) : f; + + case DOUBLE: + HoodieSchema d = HoodieSchema.create(HoodieSchemaType.DOUBLE); + return nullable ? nullableSchema(d) : d; + + case CHAR: + case VARCHAR: + HoodieSchema str = HoodieSchema.create(HoodieSchemaType.STRING); + return nullable ? nullableSchema(str) : str; + + case BINARY: + case VARBINARY: + HoodieSchema binary = HoodieSchema.create(HoodieSchemaType.BYTES); + return nullable ? nullableSchema(binary) : binary; + + case TIMESTAMP_WITHOUT_TIME_ZONE: + final TimestampType timestampType = (TimestampType) logicalType; + precision = timestampType.getPrecision(); + HoodieSchema timestamp; + if (precision <= 3) { + timestamp = HoodieSchema.createTimestampMillis(); + } else if (precision <= 6) { + timestamp = HoodieSchema.createTimestampMicros(); + } else { + throw new IllegalArgumentException( + "HoodieSchema does not support TIMESTAMP type with precision: " + + precision + + ", it only supports precisions <= 6."); + } + return nullable ? nullableSchema(timestamp) : timestamp; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; + precision = localZonedTimestampType.getPrecision(); + HoodieSchema localTimestamp; + if (precision <= 3) { + localTimestamp = HoodieSchema.createLocalTimestampMillis(); + } else if (precision <= 6) { + localTimestamp = HoodieSchema.createLocalTimestampMicros(); + } else { + throw new IllegalArgumentException( + "HoodieSchema does not support LOCAL TIMESTAMP type with precision: " + + precision + + ", it only supports precisions <= 6."); + } + return nullable ? nullableSchema(localTimestamp) : localTimestamp; + + case DATE: + HoodieSchema date = HoodieSchema.createDate(); + return nullable ? nullableSchema(date) : date; + + case TIME_WITHOUT_TIME_ZONE: + precision = ((TimeType) logicalType).getPrecision(); + if (precision > 3) { + throw new IllegalArgumentException( + "HoodieSchema does not support TIME type with precision: " + + precision + + ", it only supports precision <= 3."); + } + HoodieSchema time = HoodieSchema.createTimeMillis(); + return nullable ? nullableSchema(time) : time; + + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + int fixedSize = computeMinBytesForDecimalPrecision(decimalType.getPrecision()); + HoodieSchema decimal = HoodieSchema.createDecimal( + String.format("%s.fixed", rowName), + null, + null, + decimalType.getPrecision(), + decimalType.getScale(), + fixedSize + ); + return nullable ? nullableSchema(decimal) : decimal; + + case ROW: + RowType rowType = (RowType) logicalType; + List<String> fieldNames = rowType.getFieldNames(); + + List<HoodieSchemaField> hoodieFields = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = fieldNames.get(i); + LogicalType fieldType = rowType.getTypeAt(i); + + // Recursive call for field schema + HoodieSchema fieldSchema = convertToSchema(fieldType, rowName + "." + fieldName); + + // Create field with or without default value + HoodieSchemaField field; + if (fieldType.isNullable()) { + field = HoodieSchemaField.of(fieldName, fieldSchema, null, HoodieSchema.NULL_VALUE); + } else { + field = HoodieSchemaField.of(fieldName, fieldSchema); + } + hoodieFields.add(field); + } + + HoodieSchema record = HoodieSchema.createRecord(rowName, null, null, hoodieFields); + return nullable ? nullableSchema(record) : record; + + case MULTISET: + case MAP: + LogicalType valueType = extractValueTypeForMap(logicalType); + HoodieSchema valueSchema = convertToSchema(valueType, rowName); + HoodieSchema map = HoodieSchema.createMap(valueSchema); + return nullable ? nullableSchema(map) : map; + + case ARRAY: + ArrayType arrayType = (ArrayType) logicalType; + HoodieSchema elementSchema = convertToSchema(arrayType.getElementType(), rowName); + HoodieSchema array = HoodieSchema.createArray(elementSchema); + return nullable ? nullableSchema(array) : array; + + case RAW: + default: + throw new UnsupportedOperationException( + "Unsupported type for HoodieSchema conversion: " + logicalType); + } + } + + /** + * Extracts value type for map conversion. + * Maps must have string keys for Avro/HoodieSchema compatibility. + */ + private static LogicalType extractValueTypeForMap(LogicalType type) { + LogicalType keyType; + LogicalType valueType; + if (type instanceof MapType) { + MapType mapType = (MapType) type; + keyType = mapType.getKeyType(); + valueType = mapType.getValueType(); + } else { + MultisetType multisetType = (MultisetType) type; + keyType = multisetType.getElementType(); + valueType = new IntType(); + } + if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) { + throw new UnsupportedOperationException( + "HoodieSchema doesn't support non-string as key type of map. " + + "The key type is: " + + keyType.asSummaryString()); + } + return valueType; + } + + /** + * Returns whether the given logical type belongs to the family. + */ + private static boolean isFamily(LogicalType logicalType, LogicalTypeFamily family) { + return logicalType.getTypeRoot().getFamilies().contains(family); + } + + /** + * Returns schema with nullable wrapper. + */ + private static HoodieSchema nullableSchema(HoodieSchema schema) { + return schema.isNullable() + ? schema + : HoodieSchema.createNullable(schema); + } Review Comment: `HoodieSchema.createNullable` will already check if the existing schema is nullable for you. ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java: ########## @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.ReflectionUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Converts Flink's LogicalType into HoodieSchema. + */ +public class HoodieSchemaConverter { + + /** + * Converts a Flink LogicalType into a HoodieSchema. + * + * <p>Uses "record" as the default type name for record types. + * + * @param logicalType Flink logical type definition + * @return HoodieSchema matching the logical type + */ + public static HoodieSchema convertToSchema(LogicalType logicalType) { + return convertToSchema(logicalType, "record"); + } + + /** + * Converts a Flink LogicalType into a HoodieSchema with specified record name. + * + * <p>The "{rowName}." is used as the nested row type name prefix in order to generate + * the right schema. Nested record types that only differ by type name are still compatible. + * + * @param logicalType Flink logical type + * @param rowName the record name + * @return HoodieSchema matching this logical type + */ + public static HoodieSchema convertToSchema(LogicalType logicalType, String rowName) { + int precision; + boolean nullable = logicalType.isNullable(); + + switch (logicalType.getTypeRoot()) { + case NULL: + return HoodieSchema.create(HoodieSchemaType.NULL); + + case BOOLEAN: + HoodieSchema bool = HoodieSchema.create(HoodieSchemaType.BOOLEAN); + return nullable ? nullableSchema(bool) : bool; + + case TINYINT: + case SMALLINT: + case INTEGER: + HoodieSchema integer = HoodieSchema.create(HoodieSchemaType.INT); + return nullable ? nullableSchema(integer) : integer; + + case BIGINT: + HoodieSchema bigint = HoodieSchema.create(HoodieSchemaType.LONG); + return nullable ? nullableSchema(bigint) : bigint; + + case FLOAT: + HoodieSchema f = HoodieSchema.create(HoodieSchemaType.FLOAT); + return nullable ? nullableSchema(f) : f; + + case DOUBLE: + HoodieSchema d = HoodieSchema.create(HoodieSchemaType.DOUBLE); + return nullable ? nullableSchema(d) : d; + + case CHAR: + case VARCHAR: + HoodieSchema str = HoodieSchema.create(HoodieSchemaType.STRING); + return nullable ? nullableSchema(str) : str; + + case BINARY: + case VARBINARY: + HoodieSchema binary = HoodieSchema.create(HoodieSchemaType.BYTES); + return nullable ? nullableSchema(binary) : binary; + + case TIMESTAMP_WITHOUT_TIME_ZONE: + final TimestampType timestampType = (TimestampType) logicalType; + precision = timestampType.getPrecision(); + HoodieSchema timestamp; + if (precision <= 3) { + timestamp = HoodieSchema.createTimestampMillis(); + } else if (precision <= 6) { + timestamp = HoodieSchema.createTimestampMicros(); + } else { + throw new IllegalArgumentException( + "HoodieSchema does not support TIMESTAMP type with precision: " + + precision + + ", it only supports precisions <= 6."); + } + return nullable ? nullableSchema(timestamp) : timestamp; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; + precision = localZonedTimestampType.getPrecision(); + HoodieSchema localTimestamp; + if (precision <= 3) { + localTimestamp = HoodieSchema.createLocalTimestampMillis(); + } else if (precision <= 6) { + localTimestamp = HoodieSchema.createLocalTimestampMicros(); + } else { + throw new IllegalArgumentException( + "HoodieSchema does not support LOCAL TIMESTAMP type with precision: " + + precision + + ", it only supports precisions <= 6."); + } + return nullable ? nullableSchema(localTimestamp) : localTimestamp; + + case DATE: + HoodieSchema date = HoodieSchema.createDate(); + return nullable ? nullableSchema(date) : date; + + case TIME_WITHOUT_TIME_ZONE: + precision = ((TimeType) logicalType).getPrecision(); + if (precision > 3) { + throw new IllegalArgumentException( + "HoodieSchema does not support TIME type with precision: " + + precision + + ", it only supports precision <= 3."); + } + HoodieSchema time = HoodieSchema.createTimeMillis(); + return nullable ? nullableSchema(time) : time; + + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + int fixedSize = computeMinBytesForDecimalPrecision(decimalType.getPrecision()); + HoodieSchema decimal = HoodieSchema.createDecimal( + String.format("%s.fixed", rowName), + null, + null, + decimalType.getPrecision(), + decimalType.getScale(), + fixedSize + ); + return nullable ? nullableSchema(decimal) : decimal; + + case ROW: + RowType rowType = (RowType) logicalType; + List<String> fieldNames = rowType.getFieldNames(); + + List<HoodieSchemaField> hoodieFields = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = fieldNames.get(i); + LogicalType fieldType = rowType.getTypeAt(i); + + // Recursive call for field schema + HoodieSchema fieldSchema = convertToSchema(fieldType, rowName + "." + fieldName); + + // Create field with or without default value + HoodieSchemaField field; + if (fieldType.isNullable()) { + field = HoodieSchemaField.of(fieldName, fieldSchema, null, HoodieSchema.NULL_VALUE); + } else { + field = HoodieSchemaField.of(fieldName, fieldSchema); + } + hoodieFields.add(field); + } + + HoodieSchema record = HoodieSchema.createRecord(rowName, null, null, hoodieFields); + return nullable ? nullableSchema(record) : record; + + case MULTISET: + case MAP: + LogicalType valueType = extractValueTypeForMap(logicalType); + HoodieSchema valueSchema = convertToSchema(valueType, rowName); + HoodieSchema map = HoodieSchema.createMap(valueSchema); + return nullable ? nullableSchema(map) : map; + + case ARRAY: + ArrayType arrayType = (ArrayType) logicalType; + HoodieSchema elementSchema = convertToSchema(arrayType.getElementType(), rowName); + HoodieSchema array = HoodieSchema.createArray(elementSchema); + return nullable ? nullableSchema(array) : array; + + case RAW: + default: + throw new UnsupportedOperationException( + "Unsupported type for HoodieSchema conversion: " + logicalType); + } + } + + /** + * Extracts value type for map conversion. + * Maps must have string keys for Avro/HoodieSchema compatibility. + */ + private static LogicalType extractValueTypeForMap(LogicalType type) { + LogicalType keyType; + LogicalType valueType; + if (type instanceof MapType) { + MapType mapType = (MapType) type; + keyType = mapType.getKeyType(); + valueType = mapType.getValueType(); + } else { + MultisetType multisetType = (MultisetType) type; + keyType = multisetType.getElementType(); + valueType = new IntType(); + } + if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) { + throw new UnsupportedOperationException( + "HoodieSchema doesn't support non-string as key type of map. " + + "The key type is: " + + keyType.asSummaryString()); + } + return valueType; + } + + /** + * Returns whether the given logical type belongs to the family. + */ + private static boolean isFamily(LogicalType logicalType, LogicalTypeFamily family) { + return logicalType.getTypeRoot().getFamilies().contains(family); + } + + /** + * Returns schema with nullable wrapper. + */ + private static HoodieSchema nullableSchema(HoodieSchema schema) { + return schema.isNullable() + ? schema + : HoodieSchema.createNullable(schema); + } + + /** + * Computes minimum bytes needed for decimal precision. + * This ensures compatibility with Avro fixed-size decimal representation. + */ + private static int computeMinBytesForDecimalPrecision(int precision) { + int numBytes = 1; + while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) { + numBytes += 1; + } + return numBytes; + } + + // ===== Conversion from HoodieSchema to Flink DataType ===== + + /** + * Converts a HoodieSchema into Flink's DataType. + * + * <p>This method provides native conversion from HoodieSchema to Flink DataType + * without going through Avro intermediate representation, future-proofing the + * implementation against changes in the Avro layer. + * + * @param hoodieSchema the HoodieSchema to convert + * @return Flink DataType matching the schema + * @throws IllegalArgumentException if the schema contains unsupported types + */ + public static DataType convertToDataType(HoodieSchema hoodieSchema) { + if (hoodieSchema == null) { + throw new IllegalArgumentException("HoodieSchema cannot be null"); + } + + // Check for special subclasses first (before accessing type) + if (hoodieSchema instanceof HoodieSchema.Decimal) { + return convertDecimal(hoodieSchema); + } else if (hoodieSchema instanceof HoodieSchema.Timestamp) { + return convertTimestamp(hoodieSchema); + } else if (hoodieSchema instanceof HoodieSchema.Time) { + return convertTime(hoodieSchema); + } Review Comment: Instead of using `instanceOf` just use the switch on the type below ########## hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java: ########## @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link HoodieSchemaConverter}. + */ +public class TestHoodieSchemaConverter { + + @Test + public void testPrimitiveTypes() { + // String + HoodieSchema stringSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.STRING, stringSchema.getType()); + + // Int + HoodieSchema intSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.INT, intSchema.getType()); + + // Long + HoodieSchema longSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BIGINT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.LONG, longSchema.getType()); + + // Float + HoodieSchema floatSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.FLOAT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.FLOAT, floatSchema.getType()); + + // Double + HoodieSchema doubleSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DOUBLE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DOUBLE, doubleSchema.getType()); + + // Boolean + HoodieSchema boolSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BOOLEAN().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BOOLEAN, boolSchema.getType()); + + // Bytes + HoodieSchema bytesSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BYTES().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BYTES, bytesSchema.getType()); + } + + @Test + public void testNullableTypes() { + HoodieSchema nullableString = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableString.getType()); + assertTrue(nullableString.isNullable()); + + HoodieSchema nullableInt = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableInt.getType()); + assertTrue(nullableInt.isNullable()); + } + + @Test + public void testTemporalTypes() { + // Date + HoodieSchema dateSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DATE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DATE, dateSchema.getType()); + + // Time + HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIME(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIME, timeSchema.getType()); + + // Timestamp millis + HoodieSchema timestampMillisSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, timestampMillisSchema.getType()); + assertTrue(timestampMillisSchema instanceof HoodieSchema.Timestamp); + assertEquals(HoodieSchema.TimePrecision.MILLIS, + ((HoodieSchema.Timestamp) timestampMillisSchema).getPrecision()); + + // Timestamp micros + HoodieSchema timestampMicrosSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP(6).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, timestampMicrosSchema.getType()); + assertTrue(timestampMicrosSchema instanceof HoodieSchema.Timestamp); + assertEquals(HoodieSchema.TimePrecision.MICROS, + ((HoodieSchema.Timestamp) timestampMicrosSchema).getPrecision()); + + // Local timestamp millis + HoodieSchema localTimestampMillisSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMillisSchema.getType()); + + // Local timestamp micros + HoodieSchema localTimestampMicrosSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMicrosSchema.getType()); + } + + @Test + public void testDecimalType() { + HoodieSchema decimalSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DECIMAL(10, 2).notNull().getLogicalType(), "test"); + assertEquals(HoodieSchemaType.DECIMAL, decimalSchema.getType()); + assertTrue(decimalSchema instanceof HoodieSchema.Decimal); + + HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) decimalSchema; + assertEquals(10, decimal.getPrecision()); + assertEquals(2, decimal.getScale()); + } + + @Test + public void testArrayType() { Review Comment: Let's test arrays and maps with nullable elements as well as non-null ########## hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java: ########## @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link HoodieSchemaConverter}. + */ +public class TestHoodieSchemaConverter { + + @Test + public void testPrimitiveTypes() { + // String + HoodieSchema stringSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.STRING, stringSchema.getType()); + + // Int + HoodieSchema intSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.INT, intSchema.getType()); + + // Long + HoodieSchema longSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BIGINT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.LONG, longSchema.getType()); + + // Float + HoodieSchema floatSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.FLOAT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.FLOAT, floatSchema.getType()); + + // Double + HoodieSchema doubleSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DOUBLE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DOUBLE, doubleSchema.getType()); + + // Boolean + HoodieSchema boolSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BOOLEAN().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BOOLEAN, boolSchema.getType()); + + // Bytes + HoodieSchema bytesSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BYTES().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BYTES, bytesSchema.getType()); + } + + @Test + public void testNullableTypes() { + HoodieSchema nullableString = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableString.getType()); + assertTrue(nullableString.isNullable()); + + HoodieSchema nullableInt = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableInt.getType()); + assertTrue(nullableInt.isNullable()); + } + + @Test + public void testTemporalTypes() { + // Date + HoodieSchema dateSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DATE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DATE, dateSchema.getType()); + + // Time + HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIME(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIME, timeSchema.getType()); + + // Timestamp millis + HoodieSchema timestampMillisSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, timestampMillisSchema.getType()); + assertTrue(timestampMillisSchema instanceof HoodieSchema.Timestamp); + assertEquals(HoodieSchema.TimePrecision.MILLIS, + ((HoodieSchema.Timestamp) timestampMillisSchema).getPrecision()); + + // Timestamp micros + HoodieSchema timestampMicrosSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP(6).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, timestampMicrosSchema.getType()); + assertTrue(timestampMicrosSchema instanceof HoodieSchema.Timestamp); + assertEquals(HoodieSchema.TimePrecision.MICROS, + ((HoodieSchema.Timestamp) timestampMicrosSchema).getPrecision()); + + // Local timestamp millis + HoodieSchema localTimestampMillisSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMillisSchema.getType()); + + // Local timestamp micros + HoodieSchema localTimestampMicrosSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMicrosSchema.getType()); + } + + @Test + public void testDecimalType() { + HoodieSchema decimalSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DECIMAL(10, 2).notNull().getLogicalType(), "test"); + assertEquals(HoodieSchemaType.DECIMAL, decimalSchema.getType()); + assertTrue(decimalSchema instanceof HoodieSchema.Decimal); + + HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) decimalSchema; + assertEquals(10, decimal.getPrecision()); + assertEquals(2, decimal.getScale()); + } + + @Test + public void testArrayType() { + LogicalType arrayType = DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull().getLogicalType(); + HoodieSchema arraySchema = HoodieSchemaConverter.convertToSchema(arrayType); + + assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType()); + assertEquals(HoodieSchemaType.STRING, arraySchema.getElementType().getType()); + } + + @Test + public void testMapType() { + LogicalType mapType = DataTypes.MAP( + DataTypes.STRING().notNull(), + DataTypes.INT().notNull()).notNull().getLogicalType(); + HoodieSchema mapSchema = HoodieSchemaConverter.convertToSchema(mapType); + + assertEquals(HoodieSchemaType.MAP, mapSchema.getType()); + assertEquals(HoodieSchemaType.INT, mapSchema.getValueType().getType()); + } + + @Test + public void testRecordType() { + LogicalType recordType = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().notNull()), + DataTypes.FIELD("age", DataTypes.INT().nullable()) + ).notNull().getLogicalType(); + + HoodieSchema recordSchema = HoodieSchemaConverter.convertToSchema(recordType, "Person"); + + assertEquals(HoodieSchemaType.RECORD, recordSchema.getType()); + assertEquals(3, recordSchema.getFields().size()); + assertEquals("id", recordSchema.getFields().get(0).name()); + assertEquals("name", recordSchema.getFields().get(1).name()); + assertEquals("age", recordSchema.getFields().get(2).name()); + + // Verify nullable field + assertTrue(recordSchema.getFields().get(2).schema().isNullable()); + } + + @Test + public void testNestedRecordType() { + LogicalType nestedRecordType = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT().notNull()), + DataTypes.FIELD("address", DataTypes.ROW( + DataTypes.FIELD("street", DataTypes.STRING().notNull()), + DataTypes.FIELD("city", DataTypes.STRING().notNull()) + ).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema nestedSchema = HoodieSchemaConverter.convertToSchema(nestedRecordType, "User"); + + assertEquals(HoodieSchemaType.RECORD, nestedSchema.getType()); + assertEquals(2, nestedSchema.getFields().size()); + + HoodieSchema addressSchema = nestedSchema.getFields().get(1).schema(); + assertEquals(HoodieSchemaType.RECORD, addressSchema.getType()); + assertEquals(2, addressSchema.getFields().size()); + } + + @Test + public void testCompareWithAvroConversion() { + // Test that HoodieSchemaConverter produces the same result as + // AvroSchemaConverter + HoodieSchema.fromAvroSchema() + + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()), + DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull()) + ).notNull().getLogicalType(); + + // Method 1: Direct HoodieSchema conversion + HoodieSchema directSchema = HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord"); + + // Method 2: Via Avro conversion + HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema( + AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord")); + + // Both should produce equivalent schemas + assertNotNull(directSchema); + assertNotNull(viaAvroSchema); + assertEquals(HoodieSchemaType.RECORD, directSchema.getType()); + assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType()); + assertEquals(4, directSchema.getFields().size()); + assertEquals(4, viaAvroSchema.getFields().size()); + + // Verify field types match + for (int i = 0; i < 4; i++) { + assertEquals( + viaAvroSchema.getFields().get(i).schema().getType(), + directSchema.getFields().get(i).schema().getType(), + "Field " + i + " type mismatch" + ); + } + } + + @Test + public void testComplexNestedStructure() { + LogicalType complexType = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.STRING().notNull()), + DataTypes.FIELD("tags", DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()), + DataTypes.FIELD("metadata", DataTypes.MAP( + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull()).notNull()), + DataTypes.FIELD("nested", DataTypes.ROW( + DataTypes.FIELD("value", DataTypes.DOUBLE().notNull()), + DataTypes.FIELD("items", DataTypes.ARRAY(DataTypes.INT().notNull()).notNull()) + ).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema complexSchema = HoodieSchemaConverter.convertToSchema(complexType, "ComplexRecord"); + + assertNotNull(complexSchema); + assertEquals(HoodieSchemaType.RECORD, complexSchema.getType()); + assertEquals(4, complexSchema.getFields().size()); + + // Verify array field + assertEquals(HoodieSchemaType.ARRAY, complexSchema.getFields().get(1).schema().getType()); + + // Verify map field + assertEquals(HoodieSchemaType.MAP, complexSchema.getFields().get(2).schema().getType()); + + // Verify nested record + HoodieSchema nestedRecord = complexSchema.getFields().get(3).schema(); + assertEquals(HoodieSchemaType.RECORD, nestedRecord.getType()); + assertEquals(2, nestedRecord.getFields().size()); + } + + @Test + public void testNativeConversionMatchesAvroPath() { + // Verify native conversion produces same result as Avro path + RowType originalRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("age", DataTypes.INT().notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord"); + + // Native conversion + DataType nativeResult = HoodieSchemaConverter.convertToDataType(hoodieSchema); + + // Avro path (for comparison) + DataType avroResult = AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema()); + + assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType()); + } + + @Test + public void testRoundTripConversion() { + RowType originalRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("age", DataTypes.INT().notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord"); + RowType convertedRowType = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertEquals(originalRowType, convertedRowType); + } + + @Test + public void testConvertPrimitiveTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("string_col", DataTypes.STRING().notNull()), + DataTypes.FIELD("int_col", DataTypes.INT().notNull()), + DataTypes.FIELD("long_col", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("float_col", DataTypes.FLOAT().notNull()), + DataTypes.FIELD("double_col", DataTypes.DOUBLE().notNull()), + DataTypes.FIELD("boolean_col", DataTypes.BOOLEAN().notNull()), + DataTypes.FIELD("bytes_col", DataTypes.BYTES().notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertEquals(7, result.getFieldCount()); + assertEquals(flinkRowType, result); + } + + @Test + public void testConvertNullableTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("nullable_string", DataTypes.STRING().nullable()), + DataTypes.FIELD("nullable_int", DataTypes.INT().nullable()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertTrue(result.getTypeAt(0).isNullable()); + assertTrue(result.getTypeAt(1).isNullable()); + assertEquals(flinkRowType, result); + } + + @Test + public void testConvertTemporalTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("date_col", DataTypes.DATE().notNull()), + DataTypes.FIELD("time_col", DataTypes.TIME(3).notNull()), + DataTypes.FIELD("timestamp_millis", DataTypes.TIMESTAMP(3).notNull()), + DataTypes.FIELD("timestamp_micros", DataTypes.TIMESTAMP(6).notNull()), + DataTypes.FIELD("local_timestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()), + DataTypes.FIELD("local_timestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertEquals(flinkRowType, result); + assertEquals(3, ((TimestampType) result.getTypeAt(2)).getPrecision()); + assertEquals(6, ((TimestampType) result.getTypeAt(3)).getPrecision()); + } + + @Test + public void testConvertDecimalTypeToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("decimal_col", DataTypes.DECIMAL(10, 2).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertTrue(result.getTypeAt(0) instanceof DecimalType); + DecimalType decimal = (DecimalType) result.getTypeAt(0); + assertEquals(10, decimal.getPrecision()); + assertEquals(2, decimal.getScale()); + } + + @Test + public void testConvertComplexTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("array_col", DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()), + DataTypes.FIELD("map_col", DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.INT().notNull()).notNull()), + DataTypes.FIELD("nested_record", DataTypes.ROW( + DataTypes.FIELD("nested_id", DataTypes.INT().notNull()), + DataTypes.FIELD("nested_name", DataTypes.STRING().notNull()) + ).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); Review Comment: Could we start by generating a HoodieSchema directly for some of these tests? ########## hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java: ########## @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link HoodieSchemaConverter}. + */ +public class TestHoodieSchemaConverter { + + @Test + public void testPrimitiveTypes() { + // String + HoodieSchema stringSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.STRING, stringSchema.getType()); + + // Int + HoodieSchema intSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.INT, intSchema.getType()); + + // Long + HoodieSchema longSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BIGINT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.LONG, longSchema.getType()); + + // Float + HoodieSchema floatSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.FLOAT().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.FLOAT, floatSchema.getType()); + + // Double + HoodieSchema doubleSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DOUBLE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DOUBLE, doubleSchema.getType()); + + // Boolean + HoodieSchema boolSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BOOLEAN().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BOOLEAN, boolSchema.getType()); + + // Bytes + HoodieSchema bytesSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.BYTES().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.BYTES, bytesSchema.getType()); + } + + @Test + public void testNullableTypes() { + HoodieSchema nullableString = HoodieSchemaConverter.convertToSchema( + DataTypes.STRING().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableString.getType()); + assertTrue(nullableString.isNullable()); + + HoodieSchema nullableInt = HoodieSchemaConverter.convertToSchema( + DataTypes.INT().nullable().getLogicalType()); + assertEquals(HoodieSchemaType.UNION, nullableInt.getType()); + assertTrue(nullableInt.isNullable()); + } + + @Test + public void testTemporalTypes() { + // Date + HoodieSchema dateSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DATE().notNull().getLogicalType()); + assertEquals(HoodieSchemaType.DATE, dateSchema.getType()); + + // Time + HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIME(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIME, timeSchema.getType()); + + // Timestamp millis + HoodieSchema timestampMillisSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, timestampMillisSchema.getType()); + assertTrue(timestampMillisSchema instanceof HoodieSchema.Timestamp); + assertEquals(HoodieSchema.TimePrecision.MILLIS, + ((HoodieSchema.Timestamp) timestampMillisSchema).getPrecision()); + + // Timestamp micros + HoodieSchema timestampMicrosSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP(6).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, timestampMicrosSchema.getType()); + assertTrue(timestampMicrosSchema instanceof HoodieSchema.Timestamp); + assertEquals(HoodieSchema.TimePrecision.MICROS, + ((HoodieSchema.Timestamp) timestampMicrosSchema).getPrecision()); + + // Local timestamp millis + HoodieSchema localTimestampMillisSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMillisSchema.getType()); + + // Local timestamp micros + HoodieSchema localTimestampMicrosSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull().getLogicalType()); + assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMicrosSchema.getType()); + } + + @Test + public void testDecimalType() { + HoodieSchema decimalSchema = HoodieSchemaConverter.convertToSchema( + DataTypes.DECIMAL(10, 2).notNull().getLogicalType(), "test"); + assertEquals(HoodieSchemaType.DECIMAL, decimalSchema.getType()); + assertTrue(decimalSchema instanceof HoodieSchema.Decimal); + + HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) decimalSchema; + assertEquals(10, decimal.getPrecision()); + assertEquals(2, decimal.getScale()); + } + + @Test + public void testArrayType() { + LogicalType arrayType = DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull().getLogicalType(); + HoodieSchema arraySchema = HoodieSchemaConverter.convertToSchema(arrayType); + + assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType()); + assertEquals(HoodieSchemaType.STRING, arraySchema.getElementType().getType()); + } + + @Test + public void testMapType() { + LogicalType mapType = DataTypes.MAP( + DataTypes.STRING().notNull(), + DataTypes.INT().notNull()).notNull().getLogicalType(); + HoodieSchema mapSchema = HoodieSchemaConverter.convertToSchema(mapType); + + assertEquals(HoodieSchemaType.MAP, mapSchema.getType()); + assertEquals(HoodieSchemaType.INT, mapSchema.getValueType().getType()); + } + + @Test + public void testRecordType() { + LogicalType recordType = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().notNull()), + DataTypes.FIELD("age", DataTypes.INT().nullable()) + ).notNull().getLogicalType(); + + HoodieSchema recordSchema = HoodieSchemaConverter.convertToSchema(recordType, "Person"); + + assertEquals(HoodieSchemaType.RECORD, recordSchema.getType()); + assertEquals(3, recordSchema.getFields().size()); + assertEquals("id", recordSchema.getFields().get(0).name()); + assertEquals("name", recordSchema.getFields().get(1).name()); + assertEquals("age", recordSchema.getFields().get(2).name()); + + // Verify nullable field + assertTrue(recordSchema.getFields().get(2).schema().isNullable()); + } + + @Test + public void testNestedRecordType() { + LogicalType nestedRecordType = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT().notNull()), + DataTypes.FIELD("address", DataTypes.ROW( + DataTypes.FIELD("street", DataTypes.STRING().notNull()), + DataTypes.FIELD("city", DataTypes.STRING().notNull()) + ).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema nestedSchema = HoodieSchemaConverter.convertToSchema(nestedRecordType, "User"); + + assertEquals(HoodieSchemaType.RECORD, nestedSchema.getType()); + assertEquals(2, nestedSchema.getFields().size()); + + HoodieSchema addressSchema = nestedSchema.getFields().get(1).schema(); + assertEquals(HoodieSchemaType.RECORD, addressSchema.getType()); + assertEquals(2, addressSchema.getFields().size()); + } + + @Test + public void testCompareWithAvroConversion() { + // Test that HoodieSchemaConverter produces the same result as + // AvroSchemaConverter + HoodieSchema.fromAvroSchema() + + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()), + DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull()) + ).notNull().getLogicalType(); + + // Method 1: Direct HoodieSchema conversion + HoodieSchema directSchema = HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord"); + + // Method 2: Via Avro conversion + HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema( + AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord")); + + // Both should produce equivalent schemas + assertNotNull(directSchema); + assertNotNull(viaAvroSchema); + assertEquals(HoodieSchemaType.RECORD, directSchema.getType()); + assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType()); + assertEquals(4, directSchema.getFields().size()); + assertEquals(4, viaAvroSchema.getFields().size()); + + // Verify field types match + for (int i = 0; i < 4; i++) { + assertEquals( + viaAvroSchema.getFields().get(i).schema().getType(), + directSchema.getFields().get(i).schema().getType(), + "Field " + i + " type mismatch" + ); + } + } + + @Test + public void testComplexNestedStructure() { + LogicalType complexType = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.STRING().notNull()), + DataTypes.FIELD("tags", DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()), + DataTypes.FIELD("metadata", DataTypes.MAP( + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull()).notNull()), + DataTypes.FIELD("nested", DataTypes.ROW( + DataTypes.FIELD("value", DataTypes.DOUBLE().notNull()), + DataTypes.FIELD("items", DataTypes.ARRAY(DataTypes.INT().notNull()).notNull()) + ).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema complexSchema = HoodieSchemaConverter.convertToSchema(complexType, "ComplexRecord"); + + assertNotNull(complexSchema); + assertEquals(HoodieSchemaType.RECORD, complexSchema.getType()); + assertEquals(4, complexSchema.getFields().size()); + + // Verify array field + assertEquals(HoodieSchemaType.ARRAY, complexSchema.getFields().get(1).schema().getType()); + + // Verify map field + assertEquals(HoodieSchemaType.MAP, complexSchema.getFields().get(2).schema().getType()); + + // Verify nested record + HoodieSchema nestedRecord = complexSchema.getFields().get(3).schema(); + assertEquals(HoodieSchemaType.RECORD, nestedRecord.getType()); + assertEquals(2, nestedRecord.getFields().size()); + } + + @Test + public void testNativeConversionMatchesAvroPath() { + // Verify native conversion produces same result as Avro path + RowType originalRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("age", DataTypes.INT().notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord"); + + // Native conversion + DataType nativeResult = HoodieSchemaConverter.convertToDataType(hoodieSchema); + + // Avro path (for comparison) + DataType avroResult = AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema()); + + assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType()); + } + + @Test + public void testRoundTripConversion() { + RowType originalRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("name", DataTypes.STRING().nullable()), + DataTypes.FIELD("age", DataTypes.INT().notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord"); + RowType convertedRowType = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertEquals(originalRowType, convertedRowType); + } + + @Test + public void testConvertPrimitiveTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("string_col", DataTypes.STRING().notNull()), + DataTypes.FIELD("int_col", DataTypes.INT().notNull()), + DataTypes.FIELD("long_col", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("float_col", DataTypes.FLOAT().notNull()), + DataTypes.FIELD("double_col", DataTypes.DOUBLE().notNull()), + DataTypes.FIELD("boolean_col", DataTypes.BOOLEAN().notNull()), + DataTypes.FIELD("bytes_col", DataTypes.BYTES().notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertEquals(7, result.getFieldCount()); + assertEquals(flinkRowType, result); + } + + @Test + public void testConvertNullableTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("nullable_string", DataTypes.STRING().nullable()), + DataTypes.FIELD("nullable_int", DataTypes.INT().nullable()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertTrue(result.getTypeAt(0).isNullable()); + assertTrue(result.getTypeAt(1).isNullable()); + assertEquals(flinkRowType, result); + } + + @Test + public void testConvertTemporalTypesToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("date_col", DataTypes.DATE().notNull()), + DataTypes.FIELD("time_col", DataTypes.TIME(3).notNull()), + DataTypes.FIELD("timestamp_millis", DataTypes.TIMESTAMP(3).notNull()), + DataTypes.FIELD("timestamp_micros", DataTypes.TIMESTAMP(6).notNull()), + DataTypes.FIELD("local_timestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()), + DataTypes.FIELD("local_timestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); + RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema); + + assertEquals(flinkRowType, result); + assertEquals(3, ((TimestampType) result.getTypeAt(2)).getPrecision()); + assertEquals(6, ((TimestampType) result.getTypeAt(3)).getPrecision()); + } + + @Test + public void testConvertDecimalTypeToDataType() { + RowType flinkRowType = (RowType) DataTypes.ROW( + DataTypes.FIELD("decimal_col", DataTypes.DECIMAL(10, 2).notNull()) + ).notNull().getLogicalType(); + + HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); Review Comment: Let's add a conversion for a decimal backed by a fixed size byte array -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
