jonvex commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2251741418
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java:
##########
@@ -19,20 +19,356 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieAvroSchemaException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
+import static org.apache.hudi.avro.HoodieAvroUtils.createFullName;
+import static org.apache.hudi.avro.HoodieAvroUtils.createNamePrefix;
+import static org.apache.hudi.avro.HoodieAvroUtils.getOldFieldNameWithRenaming;
+import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
public class HoodieArrayWritableAvroUtils {
+ public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+ return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema,
newSchema, renameCols, new LinkedList<>());
+ }
+
+ private static Writable rewriteRecordWithNewSchema(Writable writable, Schema
oldAvroSchema, Schema newAvroSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ if (writable == null) {
+ return null;
+ }
+ Schema oldSchema = AvroSchemaUtils.resolveNullableSchema(oldAvroSchema);
+ Schema newSchema = AvroSchemaUtils.resolveNullableSchema(newAvroSchema);
+ if (oldSchema.equals(newSchema)) {
+ return writable;
+ }
+ return rewriteRecordWithNewSchemaInternal(writable, oldSchema, newSchema,
renameCols, fieldNames);
+ }
+
+ private static Writable rewriteRecordWithNewSchemaInternal(Writable
writable, Schema oldSchema, Schema newSchema, Map<String, String> renameCols,
Deque<String> fieldNames) {
+ switch (newSchema.getType()) {
+ case RECORD:
+ if (!(writable instanceof ArrayWritable)) {
+ throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as a record", writable.getClass().getName()));
+ }
+
+ ArrayWritable arrayWritable = (ArrayWritable) writable;
+ List<Schema.Field> fields = newSchema.getFields();
+ // projection will keep the size from the "from" schema because it
gets recycled
+ // and if the size changes the reader will fail
+ boolean noFieldsRenaming = renameCols.isEmpty();
+ String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
+ Writable[] values = new Writable[Math.max(fields.size(),
arrayWritable.get().length)];
+ for (int i = 0; i < fields.size(); i++) {
+ Schema.Field newField = newSchema.getFields().get(i);
+ String newFieldName = newField.name();
+ fieldNames.push(newFieldName);
+ Schema.Field oldField = noFieldsRenaming
+ ? oldSchema.getField(newFieldName)
+ : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix,
newFieldName, renameCols));
+ if (oldField != null) {
+ values[i] =
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()],
oldField.schema(), newField.schema(), renameCols, fieldNames);
+ } else if (newField.defaultVal() instanceof JsonProperties.Null) {
+ values[i] = NullWritable.get();
+ } else if (!isNullable(newField.schema()) && newField.defaultVal()
== null) {
+ throw new SchemaCompatibilityException("Field " +
createFullName(fieldNames) + " has no default value and is non-nullable");
+ } else if (newField.defaultVal() != null) {
+ switch
(AvroSchemaUtils.resolveNullableSchema(newField.schema()).getType()) {
+ case BOOLEAN:
+ values[i] = new BooleanWritable((Boolean)
newField.defaultVal());
+ break;
+ case INT:
+ values[i] = new IntWritable((Integer) newField.defaultVal());
+ break;
+ case LONG:
+ values[i] = new LongWritable((Long) newField.defaultVal());
+ break;
+ case FLOAT:
+ values[i] = new FloatWritable((Float) newField.defaultVal());
+ break;
+ case DOUBLE:
+ values[i] = new DoubleWritable((Double) newField.defaultVal());
+ break;
+ case STRING:
+ values[i] = new Text(newField.defaultVal().toString());
+ break;
+ default:
+ throw new SchemaCompatibilityException("Field " +
createFullName(fieldNames) + " has no default value");
+ }
+ }
+ fieldNames.pop();
+ }
+ return new ArrayWritable(Writable.class, values);
+
+ case ENUM:
+ if ((writable instanceof BytesWritable)) {
+ return writable;
+ }
+ if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType()
!= Schema.Type.ENUM) {
+ throw new SchemaCompatibilityException(String.format("Only ENUM or
STRING type can be converted ENUM type. Schema type was %s",
oldSchema.getType().getName()));
+ }
+ if (oldSchema.getType() == Schema.Type.STRING) {
+ return new BytesWritable(((Text) writable).copyBytes());
+ }
+ return writable;
+ case ARRAY:
+ if (!(writable instanceof ArrayWritable)) {
+ throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as an array", writable.getClass().getName()));
+ }
+ ArrayWritable array = (ArrayWritable) writable;
+ fieldNames.push("element");
+ for (int i = 0; i < array.get().length; i++) {
+ array.get()[i] = rewriteRecordWithNewSchema(array.get()[i],
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+ }
+ fieldNames.pop();
+ return array;
+ case MAP:
+ if (!(writable instanceof ArrayWritable)) {
+ throw new SchemaCompatibilityException(String.format("Cannot rewrite
%s as a map", writable.getClass().getName()));
+ }
+ ArrayWritable map = (ArrayWritable) writable;
+ fieldNames.push("value");
+ for (int i = 0; i < map.get().length; i++) {
+ Writable mapEntry = map.get()[i];
+ ((ArrayWritable) mapEntry).get()[1] =
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1],
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+ }
+ return map;
+
+ case UNION:
+ throw new IllegalArgumentException("should not be here?");
+
+ default:
+ return rewritePrimaryType(writable, oldSchema, newSchema);
+ }
+ }
+
+ public static Writable rewritePrimaryType(Writable writable, Schema
oldSchema, Schema newSchema) {
+ if (oldSchema.getType() == newSchema.getType()) {
+ switch (oldSchema.getType()) {
+ case NULL:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BYTES:
+ case STRING:
+ return writable;
+ case FIXED:
+ if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
+ // Check whether this is a [[Decimal]]'s precision change
+ if (oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
oldSchema.getLogicalType();
+ return
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+ } else {
+ throw new HoodieAvroSchemaException("Fixed type size change is
not currently supported");
+ }
+ }
+
+ // For [[Fixed]] data type both size and name have to match
+ //
+ // NOTE: That for values wrapped into [[Union]], to make sure that
reverse lookup (by
+ // full-name) is working we have to make sure that both
schema's name and namespace
+ // do match
+ if (Objects.equals(oldSchema.getFullName(),
newSchema.getFullName())) {
+ return writable;
+ } else {
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
oldSchema.getLogicalType();
+ return
HiveDecimalUtils.enforcePrecisionScale((HiveDecimalWritable) writable, new
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
+ }
+
+ default:
+ throw new HoodieAvroSchemaException("Unknown schema type: " +
newSchema.getType());
+ }
+ } else {
+ return rewritePrimaryTypeWithDiffSchemaType(writable, oldSchema,
newSchema);
+ }
+ }
+
+ private static Writable rewritePrimaryTypeWithDiffSchemaType(Writable
writable, Schema oldSchema, Schema newSchema) {
+ switch (newSchema.getType()) {
+ case NULL:
+ case BOOLEAN:
+ break;
+ case INT:
+ if (newSchema.getLogicalType() == LogicalTypes.date() &&
oldSchema.getType() == Schema.Type.STRING) {
+ return
HoodieHiveUtils.getDateWriteable((HoodieAvroUtils.fromJavaDate(java.sql.Date.valueOf(writable.toString()))));
+ }
+ break;
+ case LONG:
+ if (oldSchema.getType() == Schema.Type.INT) {
+ return new LongWritable(((IntWritable) writable).get());
+ }
+ break;
+ case FLOAT:
+ if ((oldSchema.getType() == Schema.Type.INT)
+ || (oldSchema.getType() == Schema.Type.LONG)) {
+ return oldSchema.getType() == Schema.Type.INT
+ ? new FloatWritable(((IntWritable) writable).get())
+ : new FloatWritable(((LongWritable) writable).get());
+ }
+ break;
+ case DOUBLE:
+ if (oldSchema.getType() == Schema.Type.FLOAT) {
+ // java float cannot convert to double directly, deal with float
precision change
+ return new DoubleWritable(Double.parseDouble(((FloatWritable)
writable).get() + ""));
+ } else if (oldSchema.getType() == Schema.Type.INT) {
+ return new DoubleWritable(((IntWritable) writable).get());
+ } else if (oldSchema.getType() == Schema.Type.LONG) {
+ return new DoubleWritable(((LongWritable) writable).get());
+ }
+ break;
+ case BYTES:
+ if (oldSchema.getType() == Schema.Type.STRING) {
+ return new BytesWritable(getUTF8Bytes(writable.toString()));
+ }
+ break;
+ case STRING:
+ if (oldSchema.getType() == Schema.Type.ENUM) {
+ return writable;
+ }
+ if (oldSchema.getType() == Schema.Type.BYTES) {
+ return new Text(StringUtils.fromUTF8Bytes(((BytesWritable)
writable).getBytes()));
+ }
+ if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+ return new Text(toJavaDate(((IntWritable)
writable).get()).toString());
+ }
+ if (oldSchema.getType() == Schema.Type.INT
+ || oldSchema.getType() == Schema.Type.LONG
+ || oldSchema.getType() == Schema.Type.FLOAT
+ || oldSchema.getType() == Schema.Type.DOUBLE) {
+ return new Text(writable.toString());
+ }
+ if (oldSchema.getType() == Schema.Type.FIXED &&
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ HiveDecimalWritable hdw = (HiveDecimalWritable) writable;
+ return new
Text(hdw.getHiveDecimal().bigDecimalValue().toPlainString());
+ }
+ break;
+ case FIXED:
+ if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
newSchema.getLogicalType();
+ DecimalTypeInfo decimalTypeInfo = new
DecimalTypeInfo(decimal.getPrecision(), decimal.getScale());
+
+ if (oldSchema.getType() == Schema.Type.STRING
+ || oldSchema.getType() == Schema.Type.INT
+ || oldSchema.getType() == Schema.Type.LONG
+ || oldSchema.getType() == Schema.Type.FLOAT
+ || oldSchema.getType() == Schema.Type.DOUBLE) {
+ // loses trailing zeros due to hive issue. Since we only read with
hive, I think this is fine
+ HiveDecimalWritable converted = new
HiveDecimalWritable(HiveDecimal.create(new
java.math.BigDecimal(writable.toString())));
+ return HiveDecimalUtils.enforcePrecisionScale(converted,
decimalTypeInfo);
+ }
+
+ if (oldSchema.getType() == Schema.Type.BYTES) {
+ ByteBuffer buffer = ByteBuffer.wrap(((BytesWritable)
writable).getBytes());
+ BigDecimal bd = new BigDecimal(new BigInteger(buffer.array()),
decimal.getScale());
+ HiveDecimalWritable converted = new
HiveDecimalWritable(HiveDecimal.create(bd));
+ return HiveDecimalUtils.enforcePrecisionScale(converted,
decimalTypeInfo);
+ }
+ }
+ break;
+ default:
+ }
+ throw new HoodieAvroSchemaException(String.format("cannot support rewrite
value for schema type: %s since the old schema type is: %s", newSchema,
oldSchema));
+ }
+
+ private static final Cache<Schema, ArrayWritableObjectInspector>
OBJECT_INSPECTOR_CACHE =
Review Comment:
Before we were using a hack to get the column names and types from the
configs. And then using the schema column names we got the types from a map.
But with schema evolution that can't be done so we need to actually use a real
way of converting avro schema to object inspector. Now we have TypeInfoFactory
that we can use to do that.
I'm very confident that constructing the object inspectors this way is
better. I am not intransigent about it needing to live for the life of the jvm.
But there is no longer a need to throw out everything each time we read a new
parquet file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]