vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028472918
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
*/
public final class SourceFormatAdapter implements Closeable {
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes invalid columns both in the data read from source and also in
the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES =
ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.column.names")
+ .defaultValue(false)
+ .withDocumentation("Sanitizes invalid column names both in the data
and also in the schema");
+
+ public static final ConfigProperty<String>
AVRO_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+ .defaultValue("__")
+ .withDocumentation("Character mask to be used as replacement for
invalid field names");
+
+ public SourceFormatAdapterConfig() {
+ super();
+ }
+
+ public SourceFormatAdapterConfig(TypedProperties props) {
+ super(props);
+ }
+ }
+
private final Source source;
+ private final SourceFormatAdapterConfig config;
public SourceFormatAdapter(Source source) {
+ this(source, Option.empty());
+ }
+
+ public SourceFormatAdapter(Source source,
+ Option<TypedProperties> props) {
this.source = source;
+ this.config = props.isPresent() ? new
SourceFormatAdapterConfig(props.get()) : new SourceFormatAdapterConfig();
+ }
+
+ /**
+ * Config that automatically sanitizes the field names as per avro naming
rules.
+ * @return enabled status.
+ */
+ private boolean isNameSanitizingEnabled() {
+ return
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_AVRO_FIELD_NAMES);
+ }
+
+ /**
+ * Replacement mask for invalid characters encountered in avro names.
+ * @return sanitized value.
+ */
+ private String getInvalidCharMask() {
+ return
config.getStringOrDefault(SourceFormatAdapterConfig.AVRO_FIELD_NAME_INVALID_CHAR_MASK);
+ }
+
+ private static DataType sanitizeDataTypeForAvro(DataType dataType, String
invalidCharMask) {
+ if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ DataType sanitizedDataType =
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+ return new ArrayType(sanitizedDataType, arrayType.containsNull());
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ DataType sanitizedKeyDataType =
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+ DataType sanitizedValueDataType =
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+ return new MapType(sanitizedKeyDataType, sanitizedValueDataType,
mapType.valueContainsNull());
+ } else if (dataType instanceof StructType) {
+ return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+ }
+ return dataType;
+ }
+
+ // TODO: Rebase this to use InternalSchema when it is ready.
Review Comment:
done and ref in TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]