vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028472918


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in 
the schema.
+    // invalid definition here goes by avro naming convention 
(https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.column.names")
+        .defaultValue(false)
+        .withDocumentation("Sanitizes invalid column names both in the data 
and also in the schema");
+
+    public static final ConfigProperty<String> 
AVRO_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for 
invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
+
   private final Source source;
+  private final SourceFormatAdapterConfig config;
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new 
SourceFormatAdapterConfig(props.get()) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming 
rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {
+    return 
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_AVRO_FIELD_NAMES);
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return 
config.getStringOrDefault(SourceFormatAdapterConfig.AVRO_FIELD_NAME_INVALID_CHAR_MASK);
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String 
invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = 
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = 
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = 
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, 
mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO: Rebase this to use InternalSchema when it is ready.

Review Comment:
   done and ref in TODO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to