pvary commented on code in PR #15475: URL: https://github.com/apache/iceberg/pull/15475#discussion_r2871557267
########## flink/v2.1/flink/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flink.formats.avro; + +import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap; + +import java.io.Serializable; +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoField; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +/** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. * */ +@Internal +public class AvroToRowDataConverters { + + /** + * Runtime converter that converts Avro data structures into objects of Flink Table & SQL internal + * data structures. + */ + @FunctionalInterface + public interface AvroToRowDataConverter extends Serializable { + Object convert(Object object); + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + public static AvroToRowDataConverter createRowConverter(RowType rowType) { + return createRowConverter(rowType, true); + } + + public static AvroToRowDataConverter createRowConverter( + RowType rowType, boolean legacyTimestampMapping) { + final AvroToRowDataConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(type -> createNullableConverter(type, legacyTimestampMapping)) + .toArray(AvroToRowDataConverter[]::new); + final int arity = rowType.getFieldCount(); + + return avroObject -> { + IndexedRecord record = (IndexedRecord) avroObject; + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; ++i) { + // avro always deserialize successfully even though the type isn't matched + // so no need to throw exception about which field can't be deserialized + row.setField(i, fieldConverters[i].convert(record.get(i))); + } + return row; + }; + } + + /** Creates a runtime converter which is null safe. */ + private static AvroToRowDataConverter createNullableConverter( + LogicalType type, boolean legacyTimestampMapping) { + final AvroToRowDataConverter converter = createConverter(type, legacyTimestampMapping); + return avroObject -> { + if (avroObject == null) { + return null; + } + return converter.convert(avroObject); + }; + } + + /** Creates a runtime converter which assuming input object is not null. */ + private static AvroToRowDataConverter createConverter( + LogicalType type, boolean legacyTimestampMapping) { + switch (type.getTypeRoot()) { + case NULL: + return avroObject -> null; + case TINYINT: + return avroObject -> ((Integer) avroObject).byteValue(); + case SMALLINT: + return avroObject -> ((Integer) avroObject).shortValue(); + case BOOLEAN: // boolean + case INTEGER: // int + case INTERVAL_YEAR_MONTH: // long + case BIGINT: // long + case INTERVAL_DAY_TIME: // long + case FLOAT: // float + case DOUBLE: // double + return avroObject -> avroObject; + case DATE: + return AvroToRowDataConverters::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return AvroToRowDataConverters::convertToTime; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return avroObject -> convertToTimestamp(avroObject, type); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (legacyTimestampMapping) { + throw new UnsupportedOperationException("Unsupported type: " + type); + } else { + return avroObject -> convertToTimestamp(avroObject, type); + } + case CHAR: + case VARCHAR: + return avroObject -> StringData.fromString(avroObject.toString()); + case BINARY: + case VARBINARY: + return AvroToRowDataConverters::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type, legacyTimestampMapping); + case ROW: + return createRowConverter((RowType) type); + case MAP: + case MULTISET: + return createMapConverter(type, legacyTimestampMapping); + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static AvroToRowDataConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return avroObject -> { + final byte[] bytes; + if (avroObject instanceof GenericFixed) { + bytes = ((GenericFixed) avroObject).bytes(); + } else if (avroObject instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) avroObject; + bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + } else { + bytes = (byte[]) avroObject; + } + return DecimalData.fromUnscaledBytes(bytes, precision, scale); Review Comment: nit newline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
