vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028470927


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -50,19 +67,98 @@ public static class Config {
 
   protected Schema targetSchema;
 
+  private static List<Object> transformList(List<Object> src, String 
invalidCharMask) {
+    return src.stream().map(obj -> {
+      if (obj instanceof List) {
+        return transformList((List<Object>) obj, invalidCharMask);
+      } else if (obj instanceof Map) {
+        return transformMap((Map<String, Object>) obj, invalidCharMask);
+      } else {
+        return obj;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private static Map<String, Object> transformMap(Map<String, Object> src, 
String invalidCharMask) {
+    return src.entrySet().stream()
+        .map(kv -> {
+          if (kv.getValue() instanceof List) {
+            return Pair.of(kv.getKey(), transformList((List<Object>) 
kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof Map) {
+            return Pair.of(kv.getKey(), transformMap((Map<String, Object>) 
kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof String) {
+            String currentStrValue = (String) kv.getValue();
+            if (kv.getKey().equals(AVRO_FIELD_NAME_KEY)) {
+              return Pair.of(kv.getKey(), 
HoodieAvroUtils.sanitizeName(currentStrValue, invalidCharMask));
+            }
+            return Pair.of(kv.getKey(), currentStrValue);
+          } else {
+            return Pair.of(kv.getKey(), kv.getValue());
+          }
+        }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+  }
+
+  private static Option<Schema> parseSanitizedAvroSchemaNoThrow(String 
schemaStr, String invalidCharMask) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+      Map<String, Object> objMap = objectMapper.readValue(schemaStr, 
Map.class);
+      Map<String, Object> modifiedMap = transformMap(objMap, invalidCharMask);
+      return Option.of(new 
Schema.Parser().parse(objectMapper.writeValueAsString(modifiedMap)));
+    } catch (Exception ex) {
+      return Option.empty();
+    }
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those 
failed.
+   * This way we can improve our parsing capabilities without breaking 
existing functionality.
+   * For example we don't yet support multiple named schemas defined in a file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean 
sanitizeSchema, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!sanitizeSchema) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, 
invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem 
fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    FSDataInputStream in = null;
+    try {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to