nsivabalan commented on code in PR #8010:
URL: https://github.com/apache/hudi/pull/8010#discussion_r1113419832


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -720,10 +721,22 @@ public static Schema getNullSchema() {
    * @return sanitized name
    */
   public static String sanitizeName(String name) {
-    if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
-      name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, 
MASK_FOR_INVALID_CHARS_IN_NAMES);
+    return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);
+  }
+
+  /**
+   * Sanitizes Name according to Avro rule for names.
+   * Removes characters other than the ones mentioned in 
https://avro.apache.org/docs/current/spec.html#names .
+   *
+   * @param name input name
+   * @param invalidCharMask replacement for invalid characters.
+   * @return sanitized name
+   */
+  public static String sanitizeName(String name, String invalidCharMask) {
+    if (INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN.matcher(name.substring(0, 
1)).matches()) {

Review Comment:
   shouldn't this be inverse? 
   if first char matches ^0-9 or if first char does not match ^A-Za-z, we 
should replace. can you help me understand what am I missing here



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -124,6 +261,7 @@ public InputBatch<Dataset<Row>> 
fetchNewDataInRowFormat(Option<String> lastCkptS
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
       case PROTO: {
+        //sanitizing is not done, but could be implemented if needed

Review Comment:
   we can do checkArg and throw exception si sanitization is enabled for proto 
source. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
 public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
+  private final SourceFormatAdapterConfig config;
+
+  public static class SourceFormatAdapterConfig extends HoodieConfig {

Review Comment:
   lets add config annotation.
   lets move this to a separate class and not embed here. 
   also, may be we should name this DeltaStreamerSourceConfigs.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -60,15 +178,20 @@ public SourceFormatAdapter(Source source) {
   public InputBatch<JavaRDD<GenericRecord>> 
fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case AVRO:
+        //don't need to sanitize because it's already avro
         return ((Source<JavaRDD<GenericRecord>>) 
source).fetchNext(lastCkptStr, sourceLimit);
       case JSON: {
+        //sanitizing is done inside the convertor

Review Comment:
   nit. end doc with " if enabled"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -79,4 +92,80 @@ public Schema getTargetSchema() {
       return super.getTargetSchema();
     }
   }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem 
fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+      schemaStr = FileIOUtils.readAsUTFString(in);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(String.format("Error reading schema from 
file %s", schemaPath), ioe);
+    }
+    return parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those 
failed.
+   * This way we can improve our parsing capabilities without breaking 
existing functionality.
+   * For example, we don't yet support multiple named schemas defined in a 
file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean 
sanitizeSchema, String invalidCharMask) {

Review Comment:
   lets try to use the same variable name, argument name across the code base. 
   for eg, shouldSanitize, sanitizeSchema should be unified.
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -79,4 +92,80 @@ public Schema getTargetSchema() {
       return super.getTargetSchema();
     }
   }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem 
fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+      schemaStr = FileIOUtils.readAsUTFString(in);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(String.format("Error reading schema from 
file %s", schemaPath), ioe);
+    }
+    return parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those 
failed.
+   * This way we can improve our parsing capabilities without breaking 
existing functionality.
+   * For example, we don't yet support multiple named schemas defined in a 
file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean 
sanitizeSchema, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!sanitizeSchema) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, 
invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  private static List<Object> transformList(List<Object> src, String 
invalidCharMask) {

Review Comment:
   java docs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
 public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
+  private final SourceFormatAdapterConfig config;
+
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes names of invalid schema fields both in the data read from 
source and also in the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<String> SANITIZE_SCHEMA_FIELD_NAMES = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+        .defaultValue("false")
+        .withDocumentation("Sanitizes invalid schema field names both in the 
data and also in the schema");
+
+    public static final ConfigProperty<String> 
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for 
invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new 
SourceFormatAdapterConfig((props.get())) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming 
rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {
+    return 
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_SCHEMA_FIELD_NAMES);
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return 
config.getStringOrDefault(SourceFormatAdapterConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK);
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String 
invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = 
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = 
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = 
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, 
mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO(HUDI-5256): Refactor this to use InternalSchema when it is ready.
+  private static StructType sanitizeStructTypeForAvro(StructType structType, 
String invalidCharMask) {
+    StructType sanitizedStructType = new StructType();
+    StructField[] structFields = structType.fields();
+    for (StructField s : structFields) {
+      DataType currFieldDataTypeSanitized = 
sanitizeDataTypeForAvro(s.dataType(), invalidCharMask);
+      StructField structFieldCopy = new 
StructField(HoodieAvroUtils.sanitizeName(s.name(), invalidCharMask),
+          currFieldDataTypeSanitized, s.nullable(), s.metadata());
+      sanitizedStructType = sanitizedStructType.add(structFieldCopy);
+    }
+    return sanitizedStructType;
+  }
+
+  private static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row> 
inputDataset, String invalidCharMask) {

Review Comment:
   lets move these static method to some Utils. SanitizeUtils or something. and 
lets keep the SourceFormatAdaptor lean.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
 public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
+  private final SourceFormatAdapterConfig config;
+
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes names of invalid schema fields both in the data read from 
source and also in the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<String> SANITIZE_SCHEMA_FIELD_NAMES = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+        .defaultValue("false")
+        .withDocumentation("Sanitizes invalid schema field names both in the 
data and also in the schema");
+
+    public static final ConfigProperty<String> 
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for 
invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new 
SourceFormatAdapterConfig((props.get())) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming 
rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {

Review Comment:
   isFieldNameSanitizationEnabled



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -60,15 +178,20 @@ public SourceFormatAdapter(Source source) {
   public InputBatch<JavaRDD<GenericRecord>> 
fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case AVRO:
+        //don't need to sanitize because it's already avro
         return ((Source<JavaRDD<GenericRecord>>) 
source).fetchNext(lastCkptStr, sourceLimit);
       case JSON: {
+        //sanitizing is done inside the convertor
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) 
source).fetchNext(lastCkptStr, sourceLimit);
-        AvroConvertor convertor = new 
AvroConvertor(r.getSchemaProvider().getSourceSchema());
+        AvroConvertor convertor = new 
AvroConvertor(r.getSchemaProvider().getSourceSchema(), 
isNameSanitizingEnabled(), getInvalidCharMask());
         return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> 
rdd.map(convertor::fromJson)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+            r.getCheckpointForNextBatch(),
+            r.getSchemaProvider());
       }
       case ROW: {
-        InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit);
+        //we do the sanitizing here

Review Comment:
   nit. end docs w/ "if enabled"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -94,27 +218,40 @@ public InputBatch<JavaRDD<GenericRecord>> 
fetchNewDataInAvroFormat(Option<String
     }
   }
 
+  private InputBatch<Dataset<Row>> 
avroDataInRowFormat(InputBatch<JavaRDD<GenericRecord>> r) {
+    Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+    return new InputBatch<>(
+        Option
+            .ofNullable(
+                r.getBatch()
+                    .map(rdd -> 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+                        source.getSparkSession())
+                    )
+                    .orElse(null)),
+        r.getCheckpointForNextBatch(), r.getSchemaProvider());
+  }
+
   /**
    * Fetch new data in row format. If the source provides data in different 
format, they are translated to Row format
    */
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> 
lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, 
sourceLimit);
+        //we do the sanitizing here
+        return trySanitizeFieldNames(((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit),
+            isNameSanitizingEnabled(), getInvalidCharMask());
       case AVRO: {
+        //don't need to sanitize because it's already avro
         InputBatch<JavaRDD<GenericRecord>> r = 
((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
-        Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        return new InputBatch<>(
-            Option
-                .ofNullable(
-                    r.getBatch()
-                        .map(rdd -> 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
-                            source.getSparkSession())
-                        )
-                        .orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        return avroDataInRowFormat(r);
       }
       case JSON: {
+        if (isNameSanitizingEnabled()) {
+          //leverage the json -> avro sanitizing, If more speed is needed, 
then specific json -> row with sanitizing is possible

Review Comment:
   can we file a follow up ticket on this. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -94,27 +218,40 @@ public InputBatch<JavaRDD<GenericRecord>> 
fetchNewDataInAvroFormat(Option<String
     }
   }
 
+  private InputBatch<Dataset<Row>> 
avroDataInRowFormat(InputBatch<JavaRDD<GenericRecord>> r) {
+    Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+    return new InputBatch<>(
+        Option
+            .ofNullable(
+                r.getBatch()
+                    .map(rdd -> 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+                        source.getSparkSession())
+                    )
+                    .orElse(null)),
+        r.getCheckpointForNextBatch(), r.getSchemaProvider());
+  }
+
   /**
    * Fetch new data in row format. If the source provides data in different 
format, they are translated to Row format
    */
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> 
lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, 
sourceLimit);
+        //we do the sanitizing here
+        return trySanitizeFieldNames(((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit),
+            isNameSanitizingEnabled(), getInvalidCharMask());
       case AVRO: {
+        //don't need to sanitize because it's already avro
         InputBatch<JavaRDD<GenericRecord>> r = 
((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
-        Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        return new InputBatch<>(
-            Option
-                .ofNullable(
-                    r.getBatch()
-                        .map(rdd -> 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
-                            source.getSparkSession())
-                        )
-                        .orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        return avroDataInRowFormat(r);
       }
       case JSON: {
+        if (isNameSanitizingEnabled()) {
+          //leverage the json -> avro sanitizing, If more speed is needed, 
then specific json -> row with sanitizing is possible

Review Comment:
   nit. "if more speed is needed" -> "Its feasible to optimize this conversion 
directly from json to avro. More details in HUDI-XXXX"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
 public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
+  private final SourceFormatAdapterConfig config;
+
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes names of invalid schema fields both in the data read from 
source and also in the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<String> SANITIZE_SCHEMA_FIELD_NAMES = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+        .defaultValue("false")
+        .withDocumentation("Sanitizes invalid schema field names both in the 
data and also in the schema");
+
+    public static final ConfigProperty<String> 
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for 
invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new 
SourceFormatAdapterConfig((props.get())) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming 
rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {
+    return 
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_SCHEMA_FIELD_NAMES);
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return 
config.getStringOrDefault(SourceFormatAdapterConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK);
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String 
invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = 
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = 
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = 
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, 
mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO(HUDI-5256): Refactor this to use InternalSchema when it is ready.
+  private static StructType sanitizeStructTypeForAvro(StructType structType, 
String invalidCharMask) {
+    StructType sanitizedStructType = new StructType();
+    StructField[] structFields = structType.fields();
+    for (StructField s : structFields) {
+      DataType currFieldDataTypeSanitized = 
sanitizeDataTypeForAvro(s.dataType(), invalidCharMask);
+      StructField structFieldCopy = new 
StructField(HoodieAvroUtils.sanitizeName(s.name(), invalidCharMask),
+          currFieldDataTypeSanitized, s.nullable(), s.metadata());
+      sanitizedStructType = sanitizedStructType.add(structFieldCopy);
+    }
+    return sanitizedStructType;
+  }
+
+  private static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row> 
inputDataset, String invalidCharMask) {
+    StructField[] inputFields = inputDataset.schema().fields();
+    Dataset<Row> targetDataset = inputDataset;
+    for (StructField sf : inputFields) {
+      DataType sanitizedFieldDataType = sanitizeDataTypeForAvro(sf.dataType(), 
invalidCharMask);
+      if (!sanitizedFieldDataType.equals(sf.dataType())) {
+        // Sanitizing column names for nested types can be thought of as going 
from one schema to another
+        // which are structurally similar except for actual column names 
itself. So casting is safe and sufficient.
+        targetDataset = targetDataset.withColumn(sf.name(), 
targetDataset.col(sf.name()).cast(sanitizedFieldDataType));
+      }
+      String possibleRename = HoodieAvroUtils.sanitizeName(sf.name(), 
invalidCharMask);
+      if (!sf.name().equals(possibleRename)) {
+        targetDataset = targetDataset.withColumnRenamed(sf.name(), 
possibleRename);
+      }
+    }
+    return targetDataset;
+  }
+
+  /**
+   * Sanitize all columns including nested ones as per Avro conventions.
+   * @param srcBatch
+   * @param shouldSanitize
+   * @param invalidCharMask
+   * @return sanitized batch.
+   */
+  private static InputBatch<Dataset<Row>> 
trySanitizeFieldNames(InputBatch<Dataset<Row>> srcBatch,

Review Comment:
   mayBeSanitizeFeildNames



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -79,4 +92,80 @@ public Schema getTargetSchema() {
       return super.getTargetSchema();
     }
   }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem 
fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+      schemaStr = FileIOUtils.readAsUTFString(in);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(String.format("Error reading schema from 
file %s", schemaPath), ioe);
+    }
+    return parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those 
failed.
+   * This way we can improve our parsing capabilities without breaking 
existing functionality.
+   * For example, we don't yet support multiple named schemas defined in a 
file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean 
sanitizeSchema, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!sanitizeSchema) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, 
invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  private static List<Object> transformList(List<Object> src, String 
invalidCharMask) {
+    return src.stream().map(obj -> {
+      if (obj instanceof List) {
+        return transformList((List<Object>) obj, invalidCharMask);
+      } else if (obj instanceof Map) {
+        return transformMap((Map<String, Object>) obj, invalidCharMask);
+      } else {
+        return obj;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private static Map<String, Object> transformMap(Map<String, Object> src, 
String invalidCharMask) {

Review Comment:
   I will leave it to take a call. lets see if it makes sense to move these 
also to SanitizationUtils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to