nsivabalan commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1025891341


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -704,10 +705,25 @@ public static Schema getNullSchema() {
    * @return sanitized name
    */
   public static String sanitizeName(String name) {
-    if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
-      name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, 
MASK_FOR_INVALID_CHARS_IN_NAMES);
+    return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);

Review Comment:
   where is this method used. should we remove this. or use the default value 
for config AVRO_FIELD_NAME_INVALID_CHAR_MASK



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -68,7 +186,8 @@ public InputBatch<JavaRDD<GenericRecord>> 
fetchNewDataInAvroFormat(Option<String
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
       case ROW: {
-        InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit);
+        InputBatch<Dataset<Row>> r = 
trySanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, 
sourceLimit),

Review Comment:
   why is this applicable only for Row source? what incase someone wants to 
sanitize for other sources as well? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in 
the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = 
ConfigProperty

Review Comment:
   minor. SANITIZE_SCHEMA_FIELD_NAMES



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in 
the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.column.names")
+        .defaultValue(false)
+        .withDocumentation("Sanitizes invalid column names both in the data 
and also in the schema");
+
+    public static final ConfigProperty<String> 
AVRO_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for 
invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
+
   private final Source source;
+  private final SourceFormatAdapterConfig config;
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new 
SourceFormatAdapterConfig(props.get()) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming 
rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {
+    return 
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_AVRO_FIELD_NAMES);
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return 
config.getStringOrDefault(SourceFormatAdapterConfig.AVRO_FIELD_NAME_INVALID_CHAR_MASK);
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String 
invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = 
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = 
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = 
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, 
mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO: Rebase this to use InternalSchema when it is ready.

Review Comment:
   can you file a follow up jira for this? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in 
the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = 
ConfigProperty

Review Comment:
   you can change the config key accordingly as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -50,19 +67,98 @@ public static class Config {
 
   protected Schema targetSchema;
 
+  private static List<Object> transformList(List<Object> src, String 
invalidCharMask) {
+    return src.stream().map(obj -> {
+      if (obj instanceof List) {
+        return transformList((List<Object>) obj, invalidCharMask);
+      } else if (obj instanceof Map) {
+        return transformMap((Map<String, Object>) obj, invalidCharMask);
+      } else {
+        return obj;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private static Map<String, Object> transformMap(Map<String, Object> src, 
String invalidCharMask) {
+    return src.entrySet().stream()
+        .map(kv -> {
+          if (kv.getValue() instanceof List) {
+            return Pair.of(kv.getKey(), transformList((List<Object>) 
kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof Map) {
+            return Pair.of(kv.getKey(), transformMap((Map<String, Object>) 
kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof String) {
+            String currentStrValue = (String) kv.getValue();
+            if (kv.getKey().equals(AVRO_FIELD_NAME_KEY)) {
+              return Pair.of(kv.getKey(), 
HoodieAvroUtils.sanitizeName(currentStrValue, invalidCharMask));
+            }
+            return Pair.of(kv.getKey(), currentStrValue);
+          } else {
+            return Pair.of(kv.getKey(), kv.getValue());
+          }
+        }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+  }
+
+  private static Option<Schema> parseSanitizedAvroSchemaNoThrow(String 
schemaStr, String invalidCharMask) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+      Map<String, Object> objMap = objectMapper.readValue(schemaStr, 
Map.class);
+      Map<String, Object> modifiedMap = transformMap(objMap, invalidCharMask);
+      return Option.of(new 
Schema.Parser().parse(objectMapper.writeValueAsString(modifiedMap)));
+    } catch (Exception ex) {
+      return Option.empty();
+    }
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those 
failed.
+   * This way we can improve our parsing capabilities without breaking 
existing functionality.
+   * For example we don't yet support multiple named schemas defined in a file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean 
sanitizeSchema, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!sanitizeSchema) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, 
invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem 
fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    FSDataInputStream in = null;
+    try {

Review Comment:
   try w/ resource might reduce the num lines. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to