TeRS-K commented on a change in pull request #2793: URL: https://github.com/apache/hudi/pull/2793#discussion_r612952338
########## File path: hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java ########## @@ -0,0 +1,804 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.Base64; +import java.util.Date; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import java.nio.charset.StandardCharsets; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.StringType; +import org.apache.avro.util.Utf8; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; +import org.apache.orc.storage.serde2.io.DateWritable; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.orc.TypeDescription; + +/** + * Methods including addToVector, addUnionValue, createOrcSchema are originally from + * https://github.com/streamsets/datacollector. + * Source classes: + * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter + * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter + * + * Changes made: + * 1. Flatten nullable Avro schema type when the value is not null in `addToVector`. + * 2. Use getLogicalType(), constants from LogicalTypes instead of getJsonProp() to handle Avro logical types. + */ +public class AvroOrcUtils { + + private static final int MICROS_PER_MILLI = 1000; + private static final int NANOS_PER_MICRO = 1000; + + /** + * Add an object (of a given ORC type) to the column vector at a given position. + * + * @param type ORC schema of the value Object. + * @param colVector The column vector to store the value Object. + * @param avroSchema Avro schema of the value Object. + * Only used to check logical types for timestamp unit conversion. + * @param value Object to be added to the column vector + * @param vectorPos The position in the vector where value will be stored at. + */ + public static void addToVector(TypeDescription type, ColumnVector colVector, Schema avroSchema, Object value, int vectorPos) { + + final int currentVecLength = colVector.isNull.length; + if (vectorPos >= currentVecLength) { + colVector.ensureSize(2 * currentVecLength, true); + } + if (value == null) { + colVector.isNull[vectorPos] = true; + colVector.noNulls = false; + return; + } + + if (avroSchema.getType().equals(Schema.Type.UNION)) { + avroSchema = getActualSchemaType(avroSchema); + } + + LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() : null; + + switch (type.getCategory()) { + case BOOLEAN: + LongColumnVector boolVec = (LongColumnVector) colVector; + boolVec.vector[vectorPos] = (boolean) value ? 1 : 0; + break; + case BYTE: + LongColumnVector byteColVec = (LongColumnVector) colVector; + byteColVec.vector[vectorPos] = (byte) value; + break; + case SHORT: + LongColumnVector shortColVec = (LongColumnVector) colVector; + shortColVec.vector[vectorPos] = (short) value; + break; + case INT: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but we will ignore that fact here + // since Orc has no way to represent a time in the way Avro defines it; we will simply preserve the int value + LongColumnVector intColVec = (LongColumnVector) colVector; + intColVec.vector[vectorPos] = (int) value; + break; + case LONG: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but we will ignore that fact here + // since Orc has no way to represent a time in the way Avro defines it; we will simply preserve the long value + LongColumnVector longColVec = (LongColumnVector) colVector; + longColVec.vector[vectorPos] = (long) value; + break; + case FLOAT: + DoubleColumnVector floatColVec = (DoubleColumnVector) colVector; + floatColVec.vector[vectorPos] = (float) value; + break; + case DOUBLE: + DoubleColumnVector doubleColVec = (DoubleColumnVector) colVector; + doubleColVec.vector[vectorPos] = (double) value; + break; + case VARCHAR: + case CHAR: + case STRING: + BytesColumnVector bytesColVec = (BytesColumnVector) colVector; + byte[] bytes = null; + + if (value instanceof String) { + bytes = ((String) value).getBytes(StandardCharsets.UTF_8); + } else if (value instanceof Utf8) { + final Utf8 utf8 = (Utf8) value; + bytes = utf8.getBytes(); + } else if (value instanceof GenericData.EnumSymbol) { + bytes = ((GenericData.EnumSymbol) value).toString().getBytes(StandardCharsets.UTF_8); + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro %s field value, which has type %s, value %s", + type.getCategory().getName(), + value.getClass().getName(), + value.toString() + )); + } + + if (bytes == null) { + bytesColVec.isNull[vectorPos] = true; + bytesColVec.noNulls = false; + } else { + bytesColVec.setRef(vectorPos, bytes, 0, bytes.length); + } + break; + case DATE: + LongColumnVector dateColVec = (LongColumnVector) colVector; + int daysSinceEpoch; + if (logicalType instanceof LogicalTypes.Date) { + daysSinceEpoch = (int) value; + } else if (value instanceof java.sql.Date) { + daysSinceEpoch = DateWritable.dateToDays((java.sql.Date) value); + } else if (value instanceof Date) { + daysSinceEpoch = DateWritable.millisToDays(((Date) value).getTime()); + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro DATE field value, which has type %s, value %s", + value.getClass().getName(), + value.toString() + )); + } + dateColVec.vector[vectorPos] = daysSinceEpoch; + break; + case TIMESTAMP: + TimestampColumnVector tsColVec = (TimestampColumnVector) colVector; + + long time; + int nanos = 0; + + // The unit for Timestamp in ORC is millis, convert timestamp to millis if needed + if (logicalType instanceof LogicalTypes.TimestampMillis) { + time = (long) value; + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + final long logicalTsValue = (long) value; + time = logicalTsValue / MICROS_PER_MILLI; + nanos = NANOS_PER_MICRO * ((int) (logicalTsValue % MICROS_PER_MILLI)); + } else if (value instanceof Timestamp) { + Timestamp tsValue = (Timestamp) value; + time = tsValue.getTime(); + nanos = tsValue.getNanos(); + } else if (value instanceof java.sql.Date) { + java.sql.Date sqlDateValue = (java.sql.Date) value; + time = sqlDateValue.getTime(); + } else if (value instanceof Date) { + Date dateValue = (Date) value; + time = dateValue.getTime(); + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro TIMESTAMP field value, which has type %s, value %s", + value.getClass().getName(), + value.toString() + )); + } + + final long millis = time % 1000; Review comment: Right, it seems like we can remove this line and a couple lines below it as I don't see why millis should need to be accounted in the nanos portion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
