autumnust commented on a change in pull request #2966: URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419787662
########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. + * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the + * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}. + * + * For simplicity here are some assumptions: + * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that + * can be widened or shrunk to each other, please use value within small range. + * - For List, Map or Union, make sure there's at least one entry within the record-container. + * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)} + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag, + int intValue, String stringValue, boolean booleanValue) { + switch (schema.getCategory()) { + case BOOLEAN: + ((BooleanWritable) w).set(booleanValue); + break; + case BYTE: + ((ByteWritable) w).set((byte) intValue); + break; + case SHORT: + ((ShortWritable) w).set((short) intValue); + break; + case INT: + ((IntWritable) w).set(intValue); + break; + case LONG: + ((LongWritable) w).set(intValue); + break; + case FLOAT: + ((FloatWritable) w).set(intValue * 1.0f); + break; + case DOUBLE: + ((DoubleWritable) w).set(intValue * 1.0); + break; + case STRING: + case CHAR: + case VARCHAR: + ((Text) w).set(stringValue); + break; + case BINARY: + throw new UnsupportedOperationException("Binary type is not supported in random orc data filler"); + case DECIMAL: + throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler"); + case DATE: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + throw new UnsupportedOperationException( + "Timestamp and its derived types is not supported in random orc data filler"); + case LIST: + OrcList castedList = (OrcList) w; + // Here it is not trivial to create typed-object in element-type. So this method expect the value container + // to at least contain one element, or the traversing within the list will be skipped. + for (Object i : castedList) { + orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + } + break; + case MAP: + OrcMap castedMap = (OrcMap) w; + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue, + stringValue, booleanValue); + } + break; + case STRUCT: + OrcStruct castedStruct = (OrcStruct) w; + int fieldIdx = 0; + for (TypeDescription child : schema.getChildren()) { + orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue, + booleanValue); + fieldIdx += 1; + } + break; + case UNION: + OrcUnion castedUnion = (OrcUnion) w; + TypeDescription targetMemberSchema = schema.getChildren().get(unionTag); + castedUnion.set(unionTag, createValueRecursively(targetMemberSchema)); + orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag, + intValue, stringValue, booleanValue); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema.toString()); + } + } + + /** + * The simple API: Union tag by default set to 0. + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue, + String stringValue, boolean booleanValue) { + orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue); + } + + /** + * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types. + * Check failure will trigger Cast exception and blow up the process. + */ + @SuppressWarnings("unchecked") + private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v, + TypeDescription targetSchema) { + + if (w instanceof OrcStruct) { + upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema); + } else if (w instanceof OrcList) { + OrcList castedList = (OrcList) w; + OrcList targetList = (OrcList) v; + TypeDescription elementType = targetSchema.getChildren().get(0); + WritableComparable targetListRecordContainer = + targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0); + targetList.clear(); + + for (int i = 0; i < castedList.size(); i++) { + targetList.add(i, + structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType)); + } + } else if (w instanceof OrcMap) { + OrcMap castedMap = (OrcMap) w; + OrcMap targetMap = (OrcMap) v; + TypeDescription valueSchema = targetSchema.getChildren().get(1); + + // Create recordContainer with the schema of value. + Iterator targetMapEntries = targetMap.values().iterator(); + WritableComparable targetMapRecordContainer = + targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next() + : createValueRecursively(valueSchema); + + targetMap.clear(); + + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + + targetMapRecordContainer = + structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema); + targetMap.put(castedEntry.getKey(), targetMapRecordContainer); + } + } else if (w instanceof OrcUnion) { + OrcUnion castedUnion = (OrcUnion) w; + OrcUnion targetUnion = (OrcUnion) v; + byte tag = castedUnion.getTag(); + + // ORC doesn't support Union type widening + // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution + // As a result, member schema within source and target should be identical. + TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag); + targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(), + (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema)); + } else { + // Regardless whether type-widening is happening or not, this method copy the value of w into v. + handlePrimitiveWritableComparable(w, v); + } + + // If non-primitive or type-widening is required, v should already be populated by w's value recursively. + return v; + } + + /** + * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}. + * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema + * to allow users convert bytes into reader's schema in a compatible approach. + * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields + * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could + * be pretty expensive if the struct schema is complicated, or contains container objects like array/map. + * + * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things: + * 1. Clear all elements in original containers. + * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type + * in {@param newStruct} if compatible. + * + * Limitation: + * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format + * from upstream is Avro, which enforces key-type to be string only. + * 2. Conversion from a field A to field B only happens if + * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true. + */ + @VisibleForTesting + public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) { + + // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through. + Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema)); + + int indexInNewSchema = 0; + List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames(); + List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren(); + List<TypeDescription> newSchemaTypes = targetSchema.getChildren(); + + for (String fieldName : targetSchema.getFieldNames()) { + if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) { + int fieldIndex = oldSchemaFieldNames.indexOf(fieldName); + + TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex); + TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema); + + if (isEvolutionValid(oldFieldSchema, newFieldSchema)) { + WritableComparable oldField = oldStruct.getFieldValue(fieldName); + WritableComparable newField = newStruct.getFieldValue(fieldName); + newField = (newField == null) ? OrcUtils.createValueRecursively(newFieldSchema) : newField; + newStruct.setFieldValue(fieldName, structConversionHelper(oldField, newField, newFieldSchema)); + } else { + throw new SchemaEvolution.IllegalEvolutionException(String + .format("ORC does not support type conversion from file" + " type %s to reader type %s ", + oldFieldSchema.toString(), newFieldSchema.toString())); + } + } else { + newStruct.setFieldValue(fieldName, null); + } + + indexInNewSchema++; + } + } + + /** + * For primitive types of {@link WritableComparable}, supporting ORC-allowed type-widening. + */ + public static void handlePrimitiveWritableComparable(WritableComparable from, WritableComparable to) { + if (from instanceof ByteWritable) { + if (to instanceof ByteWritable) { + ((ByteWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof ShortWritable) { + ((ShortWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof IntWritable) { + ((IntWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof LongWritable) { + ((LongWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((ByteWritable) from).get()); + return; + } + } else if (from instanceof ShortWritable) { + if (to instanceof ShortWritable) { + ((ShortWritable) to).set(((ShortWritable) from).get()); + return; + } else if (to instanceof IntWritable) { + ((IntWritable) to).set(((ShortWritable) from).get()); + return; + } else if (to instanceof LongWritable) { + ((LongWritable) to).set(((ShortWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((ShortWritable) from).get()); + return; + } + } else if (from instanceof IntWritable) { + if (to instanceof IntWritable) { + ((IntWritable) to).set(((IntWritable) from).get()); + return; + } else if (to instanceof LongWritable) { + ((LongWritable) to).set(((IntWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((IntWritable) from).get()); + return; + } + } else if (from instanceof LongWritable) { + if (to instanceof LongWritable) { + ((LongWritable) to).set(((LongWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((LongWritable) from).get()); + return; + } + // Following from this branch, type-widening is not allowed and only value-copy will happen. + } else if (from instanceof DoubleWritable) { + if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((DoubleWritable) from).get()); + return; + } + } else if (from instanceof BytesWritable) { + if (to instanceof BytesWritable) { + ((BytesWritable) to).set((BytesWritable) from); + return; + } + } else if (from instanceof FloatWritable) { + if (to instanceof FloatWritable) { + ((FloatWritable) to).set(((FloatWritable) from).get()); + return; + } + } else if (from instanceof Text) { + if (to instanceof Text) { + ((Text) to).set((Text) from); + return; + } + } else if (from instanceof DateWritable) { + if (to instanceof DateWritable) { + ((DateWritable) to).set(((DateWritable) from).get()); + return; + } + } else if (from instanceof OrcTimestamp && to instanceof OrcTimestamp) { + ((OrcTimestamp) to).set(((OrcTimestamp) from).toString()); + return; + } else if (from instanceof HiveDecimalWritable && to instanceof HiveDecimalWritable) { + ((HiveDecimalWritable) to).set(((HiveDecimalWritable) from).getHiveDecimal()); + return; + } + throw new UnsupportedOperationException(String + .format("The conversion of primitive-type WritableComparable object from %s to %s is not supported", + from.getClass(), to.getClass())); + } + + /** + * For nested structure like struct<a:array<struct<int,string>>>, calling OrcStruct.createValue doesn't create entry for the inner + * list, which would be required to assign a value if the entry-type has nested structure, or it just cannot see the + * entry's nested structure. + * + * This function should be fed back to open-source ORC. Review comment: A side note: I think this actually could be the reason on why we are seeing one schema evolution failure in complex-nested schema on ORC reader. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org