sv2000 commented on a change in pull request #2966: URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419130838
########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java ########## @@ -88,21 +87,33 @@ protected void reduce(KI key, Iterable<VI> values, Context context) numVals++; } + writeRetainValue(valueToRetain, context); Review comment: writeRetainValue -> writeValuesToRetain ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java ########## @@ -36,6 +38,10 @@ public class CompactionOrcJobConfigurator extends CompactionJobConfigurator { + + public static final String ORC_MAPPER_SHUFFLE_SCHEMA_KEY = "orcMapperShuffleSchema"; Review comment: Did you mean orcMapperShuffleKeySchema? i.e. is it the schema of the map output key? If so, might be useful to add a javadoc comment to help orient the reader. ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java ########## @@ -88,21 +87,33 @@ protected void reduce(KI key, Iterable<VI> values, Context context) numVals++; } + writeRetainValue(valueToRetain, context); + updateCounter(numVals, context); Review comment: updateCounter -> updateCounters. ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java ########## @@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) { // do nothing since initReusableObject has assigned value for outKey. } + @Override + protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context) + throws IOException, InterruptedException { + + /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/ + Map<Integer, Integer> valuesToRetain = new HashMap<>(); + int valueHash = 0; + + for (OrcValue value : values) { Review comment: I am not fully clear what is the objective here. Is it that we want to avoid using the entire record as the shuffle key and hence, we have to de-dupe based on the values? If so, what is the benefit of doing that? Seems like we would end up with paying the cost of computing the hash code twice, once on the map output key to shuffle the map output and once for detecting value duplicates. ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. + * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the + * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}. + * + * For simplicity here are some assumptions: + * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that + * can be widened or shrunk to each other, please use value within small range. + * - For List, Map or Union, make sure there's at least one entry within the record-container. + * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)} + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag, Review comment: orcStructFillerWithFixedValue -> fillOrcStructWithFixedValue. Better to use verbs for method names. ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java ########## @@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) { // do nothing since initReusableObject has assigned value for outKey. } + @Override + protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context) + throws IOException, InterruptedException { + + /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/ + Map<Integer, Integer> valuesToRetain = new HashMap<>(); + int valueHash = 0; + + for (OrcValue value : values) { + valueHash = ((OrcStruct) value.value).hashCode(); Review comment: Have we measured the cost of the hashCode computation? ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. + * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the + * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}. + * + * For simplicity here are some assumptions: + * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that + * can be widened or shrunk to each other, please use value within small range. + * - For List, Map or Union, make sure there's at least one entry within the record-container. + * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)} + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag, + int intValue, String stringValue, boolean booleanValue) { + switch (schema.getCategory()) { + case BOOLEAN: + ((BooleanWritable) w).set(booleanValue); + break; + case BYTE: + ((ByteWritable) w).set((byte) intValue); + break; + case SHORT: + ((ShortWritable) w).set((short) intValue); + break; + case INT: + ((IntWritable) w).set(intValue); + break; + case LONG: + ((LongWritable) w).set(intValue); + break; + case FLOAT: + ((FloatWritable) w).set(intValue * 1.0f); + break; + case DOUBLE: + ((DoubleWritable) w).set(intValue * 1.0); + break; + case STRING: + case CHAR: + case VARCHAR: + ((Text) w).set(stringValue); + break; + case BINARY: + throw new UnsupportedOperationException("Binary type is not supported in random orc data filler"); + case DECIMAL: + throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler"); + case DATE: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + throw new UnsupportedOperationException( + "Timestamp and its derived types is not supported in random orc data filler"); + case LIST: + OrcList castedList = (OrcList) w; + // Here it is not trivial to create typed-object in element-type. So this method expect the value container + // to at least contain one element, or the traversing within the list will be skipped. + for (Object i : castedList) { + orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + } + break; + case MAP: + OrcMap castedMap = (OrcMap) w; + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue, + stringValue, booleanValue); + } + break; + case STRUCT: + OrcStruct castedStruct = (OrcStruct) w; + int fieldIdx = 0; + for (TypeDescription child : schema.getChildren()) { + orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue, + booleanValue); + fieldIdx += 1; + } + break; + case UNION: + OrcUnion castedUnion = (OrcUnion) w; + TypeDescription targetMemberSchema = schema.getChildren().get(unionTag); + castedUnion.set(unionTag, createValueRecursively(targetMemberSchema)); + orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag, + intValue, stringValue, booleanValue); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema.toString()); + } + } + + /** + * The simple API: Union tag by default set to 0. + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue, + String stringValue, boolean booleanValue) { + orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue); + } + + /** + * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types. + * Check failure will trigger Cast exception and blow up the process. + */ + @SuppressWarnings("unchecked") + private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v, + TypeDescription targetSchema) { + + if (w instanceof OrcStruct) { + upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema); + } else if (w instanceof OrcList) { + OrcList castedList = (OrcList) w; + OrcList targetList = (OrcList) v; + TypeDescription elementType = targetSchema.getChildren().get(0); + WritableComparable targetListRecordContainer = + targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0); + targetList.clear(); + + for (int i = 0; i < castedList.size(); i++) { + targetList.add(i, + structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType)); + } + } else if (w instanceof OrcMap) { + OrcMap castedMap = (OrcMap) w; + OrcMap targetMap = (OrcMap) v; + TypeDescription valueSchema = targetSchema.getChildren().get(1); + + // Create recordContainer with the schema of value. + Iterator targetMapEntries = targetMap.values().iterator(); + WritableComparable targetMapRecordContainer = + targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next() + : createValueRecursively(valueSchema); + + targetMap.clear(); + + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + + targetMapRecordContainer = + structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema); + targetMap.put(castedEntry.getKey(), targetMapRecordContainer); + } + } else if (w instanceof OrcUnion) { + OrcUnion castedUnion = (OrcUnion) w; + OrcUnion targetUnion = (OrcUnion) v; + byte tag = castedUnion.getTag(); + + // ORC doesn't support Union type widening + // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution + // As a result, member schema within source and target should be identical. + TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag); + targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(), + (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema)); + } else { + // Regardless whether type-widening is happening or not, this method copy the value of w into v. + handlePrimitiveWritableComparable(w, v); + } + + // If non-primitive or type-widening is required, v should already be populated by w's value recursively. + return v; + } + + /** + * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}. + * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema + * to allow users convert bytes into reader's schema in a compatible approach. + * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields + * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could + * be pretty expensive if the struct schema is complicated, or contains container objects like array/map. + * + * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things: + * 1. Clear all elements in original containers. + * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type + * in {@param newStruct} if compatible. + * + * Limitation: + * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format + * from upstream is Avro, which enforces key-type to be string only. + * 2. Conversion from a field A to field B only happens if + * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true. + */ + @VisibleForTesting + public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) { + + // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through. + Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema)); + + int indexInNewSchema = 0; + List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames(); + List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren(); + List<TypeDescription> newSchemaTypes = targetSchema.getChildren(); + + for (String fieldName : targetSchema.getFieldNames()) { + if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) { + int fieldIndex = oldSchemaFieldNames.indexOf(fieldName); Review comment: Can we avoid the indexOf call? It is an O(N) call where N = number of fields. Can't we just store a map of fieldNames to indexes? ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. Review comment: Is the method primarily to be used for Orc struct generation in test classes? If so, can we move this inside a TestUtils class? ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. + * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the + * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}. + * + * For simplicity here are some assumptions: + * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that + * can be widened or shrunk to each other, please use value within small range. + * - For List, Map or Union, make sure there's at least one entry within the record-container. + * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)} + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag, + int intValue, String stringValue, boolean booleanValue) { + switch (schema.getCategory()) { + case BOOLEAN: + ((BooleanWritable) w).set(booleanValue); + break; + case BYTE: + ((ByteWritable) w).set((byte) intValue); + break; + case SHORT: + ((ShortWritable) w).set((short) intValue); + break; + case INT: + ((IntWritable) w).set(intValue); + break; + case LONG: + ((LongWritable) w).set(intValue); + break; + case FLOAT: + ((FloatWritable) w).set(intValue * 1.0f); + break; + case DOUBLE: + ((DoubleWritable) w).set(intValue * 1.0); + break; + case STRING: + case CHAR: + case VARCHAR: + ((Text) w).set(stringValue); + break; + case BINARY: + throw new UnsupportedOperationException("Binary type is not supported in random orc data filler"); + case DECIMAL: + throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler"); + case DATE: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + throw new UnsupportedOperationException( + "Timestamp and its derived types is not supported in random orc data filler"); + case LIST: + OrcList castedList = (OrcList) w; + // Here it is not trivial to create typed-object in element-type. So this method expect the value container + // to at least contain one element, or the traversing within the list will be skipped. + for (Object i : castedList) { + orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + } + break; + case MAP: + OrcMap castedMap = (OrcMap) w; + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue, + stringValue, booleanValue); + } + break; + case STRUCT: + OrcStruct castedStruct = (OrcStruct) w; + int fieldIdx = 0; + for (TypeDescription child : schema.getChildren()) { + orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue, + booleanValue); + fieldIdx += 1; + } + break; + case UNION: + OrcUnion castedUnion = (OrcUnion) w; + TypeDescription targetMemberSchema = schema.getChildren().get(unionTag); + castedUnion.set(unionTag, createValueRecursively(targetMemberSchema)); + orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag, + intValue, stringValue, booleanValue); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema.toString()); + } + } + + /** + * The simple API: Union tag by default set to 0. + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue, + String stringValue, boolean booleanValue) { + orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue); + } + + /** + * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types. + * Check failure will trigger Cast exception and blow up the process. + */ + @SuppressWarnings("unchecked") + private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v, + TypeDescription targetSchema) { + + if (w instanceof OrcStruct) { + upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema); + } else if (w instanceof OrcList) { + OrcList castedList = (OrcList) w; + OrcList targetList = (OrcList) v; + TypeDescription elementType = targetSchema.getChildren().get(0); + WritableComparable targetListRecordContainer = + targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0); + targetList.clear(); + + for (int i = 0; i < castedList.size(); i++) { + targetList.add(i, + structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType)); + } + } else if (w instanceof OrcMap) { + OrcMap castedMap = (OrcMap) w; + OrcMap targetMap = (OrcMap) v; + TypeDescription valueSchema = targetSchema.getChildren().get(1); + + // Create recordContainer with the schema of value. + Iterator targetMapEntries = targetMap.values().iterator(); + WritableComparable targetMapRecordContainer = + targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next() + : createValueRecursively(valueSchema); + + targetMap.clear(); + + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + + targetMapRecordContainer = + structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema); + targetMap.put(castedEntry.getKey(), targetMapRecordContainer); + } + } else if (w instanceof OrcUnion) { + OrcUnion castedUnion = (OrcUnion) w; + OrcUnion targetUnion = (OrcUnion) v; + byte tag = castedUnion.getTag(); + + // ORC doesn't support Union type widening + // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution + // As a result, member schema within source and target should be identical. + TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag); + targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(), + (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema)); + } else { + // Regardless whether type-widening is happening or not, this method copy the value of w into v. + handlePrimitiveWritableComparable(w, v); + } + + // If non-primitive or type-widening is required, v should already be populated by w's value recursively. + return v; + } + + /** + * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}. + * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema + * to allow users convert bytes into reader's schema in a compatible approach. + * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields + * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could + * be pretty expensive if the struct schema is complicated, or contains container objects like array/map. + * + * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things: + * 1. Clear all elements in original containers. + * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type + * in {@param newStruct} if compatible. + * + * Limitation: + * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format + * from upstream is Avro, which enforces key-type to be string only. + * 2. Conversion from a field A to field B only happens if + * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true. + */ + @VisibleForTesting + public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) { + + // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through. + Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema)); + + int indexInNewSchema = 0; + List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames(); + List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren(); + List<TypeDescription> newSchemaTypes = targetSchema.getChildren(); + + for (String fieldName : targetSchema.getFieldNames()) { + if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) { + int fieldIndex = oldSchemaFieldNames.indexOf(fieldName); + + TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex); + TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema); + + if (isEvolutionValid(oldFieldSchema, newFieldSchema)) { + WritableComparable oldField = oldStruct.getFieldValue(fieldName); + WritableComparable newField = newStruct.getFieldValue(fieldName); + newField = (newField == null) ? OrcUtils.createValueRecursively(newFieldSchema) : newField; + newStruct.setFieldValue(fieldName, structConversionHelper(oldField, newField, newFieldSchema)); + } else { + throw new SchemaEvolution.IllegalEvolutionException(String + .format("ORC does not support type conversion from file" + " type %s to reader type %s ", + oldFieldSchema.toString(), newFieldSchema.toString())); + } + } else { + newStruct.setFieldValue(fieldName, null); + } + + indexInNewSchema++; + } + } + + /** + * For primitive types of {@link WritableComparable}, supporting ORC-allowed type-widening. + */ + public static void handlePrimitiveWritableComparable(WritableComparable from, WritableComparable to) { Review comment: Can we explain the expected behavior of this method in the JavaDoc? ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. + * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the + * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}. + * + * For simplicity here are some assumptions: + * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that + * can be widened or shrunk to each other, please use value within small range. + * - For List, Map or Union, make sure there's at least one entry within the record-container. + * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)} + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag, + int intValue, String stringValue, boolean booleanValue) { + switch (schema.getCategory()) { + case BOOLEAN: + ((BooleanWritable) w).set(booleanValue); + break; + case BYTE: + ((ByteWritable) w).set((byte) intValue); + break; + case SHORT: + ((ShortWritable) w).set((short) intValue); + break; + case INT: + ((IntWritable) w).set(intValue); + break; + case LONG: + ((LongWritable) w).set(intValue); + break; + case FLOAT: + ((FloatWritable) w).set(intValue * 1.0f); + break; + case DOUBLE: + ((DoubleWritable) w).set(intValue * 1.0); + break; + case STRING: + case CHAR: + case VARCHAR: + ((Text) w).set(stringValue); + break; + case BINARY: + throw new UnsupportedOperationException("Binary type is not supported in random orc data filler"); + case DECIMAL: + throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler"); + case DATE: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + throw new UnsupportedOperationException( + "Timestamp and its derived types is not supported in random orc data filler"); + case LIST: + OrcList castedList = (OrcList) w; + // Here it is not trivial to create typed-object in element-type. So this method expect the value container + // to at least contain one element, or the traversing within the list will be skipped. + for (Object i : castedList) { + orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + } + break; + case MAP: + OrcMap castedMap = (OrcMap) w; + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue, + stringValue, booleanValue); + } + break; + case STRUCT: + OrcStruct castedStruct = (OrcStruct) w; + int fieldIdx = 0; + for (TypeDescription child : schema.getChildren()) { + orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue, + booleanValue); + fieldIdx += 1; + } + break; + case UNION: + OrcUnion castedUnion = (OrcUnion) w; + TypeDescription targetMemberSchema = schema.getChildren().get(unionTag); + castedUnion.set(unionTag, createValueRecursively(targetMemberSchema)); + orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag, + intValue, stringValue, booleanValue); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema.toString()); + } + } + + /** + * The simple API: Union tag by default set to 0. + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue, + String stringValue, boolean booleanValue) { + orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue); + } + + /** + * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types. + * Check failure will trigger Cast exception and blow up the process. + */ + @SuppressWarnings("unchecked") + private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v, Review comment: Can we explain what this method is doing? ########## File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java ########## @@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader return ConvertTreeReaderFactory.canConvert(fileType, readerType); } } + + /** + * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}. + * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the + * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}. + * + * For simplicity here are some assumptions: + * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that + * can be widened or shrunk to each other, please use value within small range. + * - For List, Map or Union, make sure there's at least one entry within the record-container. + * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)} + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag, + int intValue, String stringValue, boolean booleanValue) { + switch (schema.getCategory()) { + case BOOLEAN: + ((BooleanWritable) w).set(booleanValue); + break; + case BYTE: + ((ByteWritable) w).set((byte) intValue); + break; + case SHORT: + ((ShortWritable) w).set((short) intValue); + break; + case INT: + ((IntWritable) w).set(intValue); + break; + case LONG: + ((LongWritable) w).set(intValue); + break; + case FLOAT: + ((FloatWritable) w).set(intValue * 1.0f); + break; + case DOUBLE: + ((DoubleWritable) w).set(intValue * 1.0); + break; + case STRING: + case CHAR: + case VARCHAR: + ((Text) w).set(stringValue); + break; + case BINARY: + throw new UnsupportedOperationException("Binary type is not supported in random orc data filler"); + case DECIMAL: + throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler"); + case DATE: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + throw new UnsupportedOperationException( + "Timestamp and its derived types is not supported in random orc data filler"); + case LIST: + OrcList castedList = (OrcList) w; + // Here it is not trivial to create typed-object in element-type. So this method expect the value container + // to at least contain one element, or the traversing within the list will be skipped. + for (Object i : castedList) { + orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + } + break; + case MAP: + OrcMap castedMap = (OrcMap) w; + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue, + stringValue, booleanValue); + orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue, + stringValue, booleanValue); + } + break; + case STRUCT: + OrcStruct castedStruct = (OrcStruct) w; + int fieldIdx = 0; + for (TypeDescription child : schema.getChildren()) { + orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue, + booleanValue); + fieldIdx += 1; + } + break; + case UNION: + OrcUnion castedUnion = (OrcUnion) w; + TypeDescription targetMemberSchema = schema.getChildren().get(unionTag); + castedUnion.set(unionTag, createValueRecursively(targetMemberSchema)); + orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag, + intValue, stringValue, booleanValue); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema.toString()); + } + } + + /** + * The simple API: Union tag by default set to 0. + */ + public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue, + String stringValue, boolean booleanValue) { + orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue); + } + + /** + * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types. + * Check failure will trigger Cast exception and blow up the process. + */ + @SuppressWarnings("unchecked") + private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v, + TypeDescription targetSchema) { + + if (w instanceof OrcStruct) { + upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema); + } else if (w instanceof OrcList) { + OrcList castedList = (OrcList) w; + OrcList targetList = (OrcList) v; + TypeDescription elementType = targetSchema.getChildren().get(0); + WritableComparable targetListRecordContainer = + targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0); + targetList.clear(); + + for (int i = 0; i < castedList.size(); i++) { + targetList.add(i, + structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType)); + } + } else if (w instanceof OrcMap) { + OrcMap castedMap = (OrcMap) w; + OrcMap targetMap = (OrcMap) v; + TypeDescription valueSchema = targetSchema.getChildren().get(1); + + // Create recordContainer with the schema of value. + Iterator targetMapEntries = targetMap.values().iterator(); + WritableComparable targetMapRecordContainer = + targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next() + : createValueRecursively(valueSchema); + + targetMap.clear(); + + for (Object entry : castedMap.entrySet()) { + Map.Entry<WritableComparable, WritableComparable> castedEntry = + (Map.Entry<WritableComparable, WritableComparable>) entry; + + targetMapRecordContainer = + structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema); + targetMap.put(castedEntry.getKey(), targetMapRecordContainer); + } + } else if (w instanceof OrcUnion) { + OrcUnion castedUnion = (OrcUnion) w; + OrcUnion targetUnion = (OrcUnion) v; + byte tag = castedUnion.getTag(); + + // ORC doesn't support Union type widening + // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution + // As a result, member schema within source and target should be identical. + TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag); + targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(), + (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema)); + } else { + // Regardless whether type-widening is happening or not, this method copy the value of w into v. + handlePrimitiveWritableComparable(w, v); + } + + // If non-primitive or type-widening is required, v should already be populated by w's value recursively. + return v; + } + + /** + * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}. + * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema + * to allow users convert bytes into reader's schema in a compatible approach. + * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields + * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could + * be pretty expensive if the struct schema is complicated, or contains container objects like array/map. + * + * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things: + * 1. Clear all elements in original containers. + * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type + * in {@param newStruct} if compatible. + * + * Limitation: + * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format + * from upstream is Avro, which enforces key-type to be string only. + * 2. Conversion from a field A to field B only happens if + * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true. + */ + @VisibleForTesting + public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) { + + // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through. + Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema)); + + int indexInNewSchema = 0; + List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames(); + List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren(); + List<TypeDescription> newSchemaTypes = targetSchema.getChildren(); + + for (String fieldName : targetSchema.getFieldNames()) { + if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) { + int fieldIndex = oldSchemaFieldNames.indexOf(fieldName); + + TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex); + TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema); + + if (isEvolutionValid(oldFieldSchema, newFieldSchema)) { + WritableComparable oldField = oldStruct.getFieldValue(fieldName); + WritableComparable newField = newStruct.getFieldValue(fieldName); + newField = (newField == null) ? OrcUtils.createValueRecursively(newFieldSchema) : newField; + newStruct.setFieldValue(fieldName, structConversionHelper(oldField, newField, newFieldSchema)); + } else { + throw new SchemaEvolution.IllegalEvolutionException(String + .format("ORC does not support type conversion from file" + " type %s to reader type %s ", + oldFieldSchema.toString(), newFieldSchema.toString())); + } + } else { + newStruct.setFieldValue(fieldName, null); + } + + indexInNewSchema++; + } + } + + /** + * For primitive types of {@link WritableComparable}, supporting ORC-allowed type-widening. + */ + public static void handlePrimitiveWritableComparable(WritableComparable from, WritableComparable to) { + if (from instanceof ByteWritable) { + if (to instanceof ByteWritable) { + ((ByteWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof ShortWritable) { + ((ShortWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof IntWritable) { + ((IntWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof LongWritable) { + ((LongWritable) to).set(((ByteWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((ByteWritable) from).get()); + return; + } + } else if (from instanceof ShortWritable) { + if (to instanceof ShortWritable) { + ((ShortWritable) to).set(((ShortWritable) from).get()); + return; + } else if (to instanceof IntWritable) { + ((IntWritable) to).set(((ShortWritable) from).get()); + return; + } else if (to instanceof LongWritable) { + ((LongWritable) to).set(((ShortWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((ShortWritable) from).get()); + return; + } + } else if (from instanceof IntWritable) { + if (to instanceof IntWritable) { + ((IntWritable) to).set(((IntWritable) from).get()); + return; + } else if (to instanceof LongWritable) { + ((LongWritable) to).set(((IntWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((IntWritable) from).get()); + return; + } + } else if (from instanceof LongWritable) { + if (to instanceof LongWritable) { + ((LongWritable) to).set(((LongWritable) from).get()); + return; + } else if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((LongWritable) from).get()); + return; + } + // Following from this branch, type-widening is not allowed and only value-copy will happen. + } else if (from instanceof DoubleWritable) { + if (to instanceof DoubleWritable) { + ((DoubleWritable) to).set(((DoubleWritable) from).get()); + return; + } + } else if (from instanceof BytesWritable) { + if (to instanceof BytesWritable) { + ((BytesWritable) to).set((BytesWritable) from); + return; + } + } else if (from instanceof FloatWritable) { + if (to instanceof FloatWritable) { + ((FloatWritable) to).set(((FloatWritable) from).get()); + return; + } + } else if (from instanceof Text) { + if (to instanceof Text) { + ((Text) to).set((Text) from); + return; + } + } else if (from instanceof DateWritable) { + if (to instanceof DateWritable) { + ((DateWritable) to).set(((DateWritable) from).get()); + return; + } + } else if (from instanceof OrcTimestamp && to instanceof OrcTimestamp) { + ((OrcTimestamp) to).set(((OrcTimestamp) from).toString()); + return; + } else if (from instanceof HiveDecimalWritable && to instanceof HiveDecimalWritable) { + ((HiveDecimalWritable) to).set(((HiveDecimalWritable) from).getHiveDecimal()); + return; + } + throw new UnsupportedOperationException(String + .format("The conversion of primitive-type WritableComparable object from %s to %s is not supported", + from.getClass(), to.getClass())); + } + + /** + * For nested structure like struct<a:array<struct<int,string>>>, calling OrcStruct.createValue doesn't create entry for the inner + * list, which would be required to assign a value if the entry-type has nested structure, or it just cannot see the + * entry's nested structure. + * + * This function should be fed back to open-source ORC. Review comment: "TODO: This function should be fed back to open-source ORC."? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org