TeRS-K commented on a change in pull request #2793:
URL: https://github.com/apache/hudi/pull/2793#discussion_r612952338



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
##########
@@ -0,0 +1,804 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.Base64;
+import java.util.Date;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import java.nio.charset.StandardCharsets;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.util.Utf8;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Methods including addToVector, addUnionValue, createOrcSchema are 
originally from
+ * https://github.com/streamsets/datacollector.
+ * Source classes:
+ * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter
+ * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter
+ *
+ * Changes made:
+ * 1. Flatten nullable Avro schema type when the value is not null in 
`addToVector`.
+ * 2. Use getLogicalType(), constants from LogicalTypes instead of 
getJsonProp() to handle Avro logical types.
+ */
+public class AvroOrcUtils {
+
+  private static final int MICROS_PER_MILLI = 1000;
+  private static final int NANOS_PER_MICRO = 1000;
+
+  /**
+   * Add an object (of a given ORC type) to the column vector at a given 
position.
+   *
+   * @param type        ORC schema of the value Object.
+   * @param colVector   The column vector to store the value Object.
+   * @param avroSchema  Avro schema of the value Object.
+   *                    Only used to check logical types for timestamp unit 
conversion.
+   * @param value       Object to be added to the column vector
+   * @param vectorPos   The position in the vector where value will be stored 
at.
+   */
+  public static void addToVector(TypeDescription type, ColumnVector colVector, 
Schema avroSchema, Object value, int vectorPos) {
+
+    final int currentVecLength = colVector.isNull.length;
+    if (vectorPos >= currentVecLength) {
+      colVector.ensureSize(2 * currentVecLength, true);
+    }
+    if (value == null) {
+      colVector.isNull[vectorPos] = true;
+      colVector.noNulls = false;
+      return;
+    }
+
+    if (avroSchema.getType().equals(Schema.Type.UNION)) {
+      avroSchema = getActualSchemaType(avroSchema);
+    }
+
+    LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() 
: null;
+
+    switch (type.getCategory()) {
+      case BOOLEAN:
+        LongColumnVector boolVec = (LongColumnVector) colVector;
+        boolVec.vector[vectorPos] = (boolean) value ? 1 : 0;
+        break;
+      case BYTE:
+        LongColumnVector byteColVec = (LongColumnVector) colVector;
+        byteColVec.vector[vectorPos] = (byte) value;
+        break;
+      case SHORT:
+        LongColumnVector shortColVec = (LongColumnVector) colVector;
+        shortColVec.vector[vectorPos] = (short) value;
+        break;
+      case INT:
+        // the Avro logical type could be 
AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but we will ignore that fact here
+        // since Orc has no way to represent a time in the way Avro defines 
it; we will simply preserve the int value
+        LongColumnVector intColVec = (LongColumnVector) colVector;
+        intColVec.vector[vectorPos] = (int) value;
+        break;
+      case LONG:
+        // the Avro logical type could be 
AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but we will ignore that fact here
+        // since Orc has no way to represent a time in the way Avro defines 
it; we will simply preserve the long value
+        LongColumnVector longColVec = (LongColumnVector) colVector;
+        longColVec.vector[vectorPos] = (long) value;
+        break;
+      case FLOAT:
+        DoubleColumnVector floatColVec = (DoubleColumnVector) colVector;
+        floatColVec.vector[vectorPos] = (float) value;
+        break;
+      case DOUBLE:
+        DoubleColumnVector doubleColVec = (DoubleColumnVector) colVector;
+        doubleColVec.vector[vectorPos] = (double) value;
+        break;
+      case VARCHAR:
+      case CHAR:
+      case STRING:
+        BytesColumnVector bytesColVec = (BytesColumnVector) colVector;
+        byte[] bytes = null;
+
+        if (value instanceof String) {
+          bytes = ((String) value).getBytes(StandardCharsets.UTF_8);
+        } else if (value instanceof Utf8) {
+          final Utf8 utf8 = (Utf8) value;
+          bytes = utf8.getBytes();
+        } else if (value instanceof GenericData.EnumSymbol) {
+          bytes = ((GenericData.EnumSymbol) 
value).toString().getBytes(StandardCharsets.UTF_8);
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro %s field value, which has type %s, 
value %s",
+              type.getCategory().getName(),
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+
+        if (bytes == null) {
+          bytesColVec.isNull[vectorPos] = true;
+          bytesColVec.noNulls = false;
+        } else {
+          bytesColVec.setRef(vectorPos, bytes, 0, bytes.length);
+        }
+        break;
+      case DATE:
+        LongColumnVector dateColVec = (LongColumnVector) colVector;
+        int daysSinceEpoch;
+        if (logicalType instanceof LogicalTypes.Date) {
+          daysSinceEpoch = (int) value;
+        } else if (value instanceof java.sql.Date) {
+          daysSinceEpoch = DateWritable.dateToDays((java.sql.Date) value);
+        } else if (value instanceof Date) {
+          daysSinceEpoch = DateWritable.millisToDays(((Date) value).getTime());
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro DATE field value, which has type %s, 
value %s",
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+        dateColVec.vector[vectorPos] = daysSinceEpoch;
+        break;
+      case TIMESTAMP:
+        TimestampColumnVector tsColVec = (TimestampColumnVector) colVector;
+
+        long time;
+        int nanos = 0;
+
+        // The unit for Timestamp in ORC is millis, convert timestamp to 
millis if needed
+        if (logicalType instanceof LogicalTypes.TimestampMillis) {
+          time = (long) value;
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          final long logicalTsValue = (long) value;
+          time = logicalTsValue / MICROS_PER_MILLI;
+          nanos = NANOS_PER_MICRO * ((int) (logicalTsValue % 
MICROS_PER_MILLI));
+        } else if (value instanceof Timestamp) {
+          Timestamp tsValue = (Timestamp) value;
+          time = tsValue.getTime();
+          nanos = tsValue.getNanos();
+        } else if (value instanceof java.sql.Date) {
+          java.sql.Date sqlDateValue = (java.sql.Date) value;
+          time = sqlDateValue.getTime();
+        } else if (value instanceof Date) {
+          Date dateValue = (Date) value;
+          time = dateValue.getTime();
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro TIMESTAMP field value, which has 
type %s, value %s",
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+
+        final long millis = time % 1000;

Review comment:
       Right, it seems like we can remove this line and a couple lines below it 
as I don't see why millis should need to be accounted in the nanos portion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to