autumnust commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419787662



##########
File path: 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, 
TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} 
contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains 
schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not 
{@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound 
values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small 
range.
+   * - For List, Map or Union, make sure there's at least one entry within the 
record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} 
instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, 
TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported 
in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported 
in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc 
data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So 
this method expect the value container
+        // to at least contain one element, or the traversing within the list 
will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, 
schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), 
schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), 
schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), 
child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = 
schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) 
castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + 
schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, 
TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, 
booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as 
they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable 
w, WritableComparable v,
+      TypeDescription targetSchema) {
+
+    if (w instanceof OrcStruct) {
+      upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      OrcList targetList = (OrcList) v;
+      TypeDescription elementType = targetSchema.getChildren().get(0);
+      WritableComparable targetListRecordContainer =
+          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : 
createValueRecursively(elementType, 0);
+      targetList.clear();
+
+      for (int i = 0; i < castedList.size(); i++) {
+        targetList.add(i,
+            structConversionHelper((WritableComparable) castedList.get(i), 
targetListRecordContainer, elementType));
+      }
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      OrcMap targetMap = (OrcMap) v;
+      TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+      // Create recordContainer with the schema of value.
+      Iterator targetMapEntries = targetMap.values().iterator();
+      WritableComparable targetMapRecordContainer =
+          targetMapEntries.hasNext() ? (WritableComparable) 
targetMapEntries.next()
+              : createValueRecursively(valueSchema);
+
+      targetMap.clear();
+
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+        targetMapRecordContainer =
+            structConversionHelper(castedEntry.getValue(), 
targetMapRecordContainer, valueSchema);
+        targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+      }
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      OrcUnion targetUnion = (OrcUnion) v;
+      byte tag = castedUnion.getTag();
+
+      // ORC doesn't support Union type widening
+      // Avro doesn't allow it either, reference: 
https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+      // As a result, member schema within source and target should be 
identical.
+      TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+      targetUnion.set(tag, structConversionHelper((WritableComparable) 
castedUnion.getObject(),
+          (WritableComparable) 
OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+    } else {
+      // Regardless whether type-widening is happening or not, this method 
copy the value of w into v.
+        handlePrimitiveWritableComparable(w, v);
+    }
+
+    // If non-primitive or type-widening is required, v should already be 
populated by w's value recursively.
+    return v;
+  }
+
+  /**
+   * Recursively convert the {@param oldStruct} into {@param newStruct} whose 
schema is {@param targetSchema}.
+   * This serves similar purpose like GenericDatumReader for Avro, which 
accepts an reader schema and writer schema
+   * to allow users convert bytes into reader's schema in a compatible 
approach.
+   * Calling this method SHALL NOT cause any side-effect for {@param 
oldStruct}, also it will copy value of each fields
+   * in {@param oldStruct} into {@param newStruct} recursively. Please ensure 
avoiding unnecessary call as it could
+   * be pretty expensive if the struct schema is complicated, or contains 
container objects like array/map.
+   *
+   * Note that if newStruct containing things like List/Map (container-type), 
the up-conversion is doing two things:
+   * 1. Clear all elements in original containers.
+   * 2. Make value of container elements in {@param oldStruct} is populated 
into {@param newStruct} with element-type
+   * in {@param newStruct} if compatible.
+   *
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps. The underlying 
reasoning is because of the primary format
+   * from upstream is Avro, which enforces key-type to be string only.
+   * 2. Conversion from a field A to field B only happens if
+   * 
org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B)
 return true.
+   */
+  @VisibleForTesting
+  public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct 
newStruct, TypeDescription targetSchema) {
+
+    // If target schema is not equal to newStruct's schema, it is a illegal 
state and doesn't make sense to work through.
+    Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+    int indexInNewSchema = 0;
+    List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+    List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+    List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+    for (String fieldName : targetSchema.getFieldNames()) {
+      if (oldSchemaFieldNames.contains(fieldName) && 
oldStruct.getFieldValue(fieldName) != null) {
+        int fieldIndex = oldSchemaFieldNames.indexOf(fieldName);
+
+        TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex);
+        TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema);
+
+        if (isEvolutionValid(oldFieldSchema, newFieldSchema)) {
+          WritableComparable oldField = oldStruct.getFieldValue(fieldName);
+          WritableComparable newField = newStruct.getFieldValue(fieldName);
+          newField = (newField == null) ? 
OrcUtils.createValueRecursively(newFieldSchema) : newField;
+          newStruct.setFieldValue(fieldName, structConversionHelper(oldField, 
newField, newFieldSchema));
+        } else {
+          throw new SchemaEvolution.IllegalEvolutionException(String
+              .format("ORC does not support type conversion from file" + " 
type %s to reader type %s ",
+                  oldFieldSchema.toString(), newFieldSchema.toString()));
+        }
+      } else {
+        newStruct.setFieldValue(fieldName, null);
+      }
+
+      indexInNewSchema++;
+    }
+  }
+
+  /**
+   * For primitive types of {@link WritableComparable}, supporting ORC-allowed 
type-widening.
+   */
+  public static void handlePrimitiveWritableComparable(WritableComparable 
from, WritableComparable to) {
+    if (from instanceof ByteWritable) {
+      if (to instanceof ByteWritable) {
+        ((ByteWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof ShortWritable) {
+        ((ShortWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((ByteWritable) from).get());
+        return;
+      }
+    } else if (from instanceof ShortWritable) {
+      if (to instanceof ShortWritable) {
+        ((ShortWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((ShortWritable) from).get());
+        return;
+      }
+    } else if (from instanceof IntWritable) {
+      if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((IntWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((IntWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((IntWritable) from).get());
+        return;
+      }
+    } else if (from instanceof LongWritable) {
+      if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((LongWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((LongWritable) from).get());
+        return;
+      }
+      // Following from this branch, type-widening is not allowed and only 
value-copy will happen.
+    } else if (from instanceof DoubleWritable) {
+      if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((DoubleWritable) from).get());
+        return;
+      }
+    } else if (from instanceof BytesWritable) {
+      if (to instanceof BytesWritable) {
+        ((BytesWritable) to).set((BytesWritable) from);
+        return;
+      }
+    } else if (from instanceof FloatWritable) {
+      if (to instanceof FloatWritable) {
+        ((FloatWritable) to).set(((FloatWritable) from).get());
+        return;
+      }
+    } else if (from instanceof Text) {
+      if (to instanceof Text) {
+        ((Text) to).set((Text) from);
+        return;
+      }
+    } else if (from instanceof DateWritable) {
+      if (to instanceof DateWritable) {
+        ((DateWritable) to).set(((DateWritable) from).get());
+        return;
+      }
+    } else if (from instanceof OrcTimestamp && to instanceof OrcTimestamp) {
+      ((OrcTimestamp) to).set(((OrcTimestamp) from).toString());
+      return;
+    } else if (from instanceof HiveDecimalWritable && to instanceof 
HiveDecimalWritable) {
+      ((HiveDecimalWritable) to).set(((HiveDecimalWritable) 
from).getHiveDecimal());
+      return;
+    }
+    throw new UnsupportedOperationException(String
+        .format("The conversion of primitive-type WritableComparable object 
from %s to %s is not supported",
+            from.getClass(), to.getClass()));
+  }
+
+  /**
+   * For nested structure like struct<a:array<struct<int,string>>>, calling 
OrcStruct.createValue doesn't create entry for the inner
+   * list, which would be required to assign a value if the entry-type has 
nested structure, or it just cannot see the
+   * entry's nested structure.
+   *
+   * This function should be fed back to open-source ORC.

Review comment:
       A side note: I think this actually could be the reason on why we are 
seeing one schema evolution failure in complex-nested schema on ORC reader. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to