nsivabalan commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1025891341
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -704,10 +705,25 @@ public static Schema getNullSchema() {
* @return sanitized name
*/
public static String sanitizeName(String name) {
- if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
- name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES,
MASK_FOR_INVALID_CHARS_IN_NAMES);
+ return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);
Review Comment:
where is this method used. should we remove this. or use the default value
for config AVRO_FIELD_NAME_INVALID_CHAR_MASK
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -68,7 +186,8 @@ public InputBatch<JavaRDD<GenericRecord>>
fetchNewDataInAvroFormat(Option<String
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case ROW: {
- InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit);
+ InputBatch<Dataset<Row>> r =
trySanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr,
sourceLimit),
Review Comment:
why is this applicable only for Row source? what incase someone wants to
sanitize for other sources as well?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
*/
public final class SourceFormatAdapter implements Closeable {
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes invalid columns both in the data read from source and also in
the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES =
ConfigProperty
Review Comment:
minor. SANITIZE_SCHEMA_FIELD_NAMES
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
*/
public final class SourceFormatAdapter implements Closeable {
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes invalid columns both in the data read from source and also in
the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES =
ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.column.names")
+ .defaultValue(false)
+ .withDocumentation("Sanitizes invalid column names both in the data
and also in the schema");
+
+ public static final ConfigProperty<String>
AVRO_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+ .defaultValue("__")
+ .withDocumentation("Character mask to be used as replacement for
invalid field names");
+
+ public SourceFormatAdapterConfig() {
+ super();
+ }
+
+ public SourceFormatAdapterConfig(TypedProperties props) {
+ super(props);
+ }
+ }
+
private final Source source;
+ private final SourceFormatAdapterConfig config;
public SourceFormatAdapter(Source source) {
+ this(source, Option.empty());
+ }
+
+ public SourceFormatAdapter(Source source,
+ Option<TypedProperties> props) {
this.source = source;
+ this.config = props.isPresent() ? new
SourceFormatAdapterConfig(props.get()) : new SourceFormatAdapterConfig();
+ }
+
+ /**
+ * Config that automatically sanitizes the field names as per avro naming
rules.
+ * @return enabled status.
+ */
+ private boolean isNameSanitizingEnabled() {
+ return
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_AVRO_FIELD_NAMES);
+ }
+
+ /**
+ * Replacement mask for invalid characters encountered in avro names.
+ * @return sanitized value.
+ */
+ private String getInvalidCharMask() {
+ return
config.getStringOrDefault(SourceFormatAdapterConfig.AVRO_FIELD_NAME_INVALID_CHAR_MASK);
+ }
+
+ private static DataType sanitizeDataTypeForAvro(DataType dataType, String
invalidCharMask) {
+ if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ DataType sanitizedDataType =
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+ return new ArrayType(sanitizedDataType, arrayType.containsNull());
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ DataType sanitizedKeyDataType =
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+ DataType sanitizedValueDataType =
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+ return new MapType(sanitizedKeyDataType, sanitizedValueDataType,
mapType.valueContainsNull());
+ } else if (dataType instanceof StructType) {
+ return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+ }
+ return dataType;
+ }
+
+ // TODO: Rebase this to use InternalSchema when it is ready.
Review Comment:
can you file a follow up jira for this?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
*/
public final class SourceFormatAdapter implements Closeable {
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes invalid columns both in the data read from source and also in
the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES =
ConfigProperty
Review Comment:
you can change the config key accordingly as well
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -50,19 +67,98 @@ public static class Config {
protected Schema targetSchema;
+ private static List<Object> transformList(List<Object> src, String
invalidCharMask) {
+ return src.stream().map(obj -> {
+ if (obj instanceof List) {
+ return transformList((List<Object>) obj, invalidCharMask);
+ } else if (obj instanceof Map) {
+ return transformMap((Map<String, Object>) obj, invalidCharMask);
+ } else {
+ return obj;
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private static Map<String, Object> transformMap(Map<String, Object> src,
String invalidCharMask) {
+ return src.entrySet().stream()
+ .map(kv -> {
+ if (kv.getValue() instanceof List) {
+ return Pair.of(kv.getKey(), transformList((List<Object>)
kv.getValue(), invalidCharMask));
+ } else if (kv.getValue() instanceof Map) {
+ return Pair.of(kv.getKey(), transformMap((Map<String, Object>)
kv.getValue(), invalidCharMask));
+ } else if (kv.getValue() instanceof String) {
+ String currentStrValue = (String) kv.getValue();
+ if (kv.getKey().equals(AVRO_FIELD_NAME_KEY)) {
+ return Pair.of(kv.getKey(),
HoodieAvroUtils.sanitizeName(currentStrValue, invalidCharMask));
+ }
+ return Pair.of(kv.getKey(), currentStrValue);
+ } else {
+ return Pair.of(kv.getKey(), kv.getValue());
+ }
+ }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+ }
+
+ private static Option<Schema> parseSanitizedAvroSchemaNoThrow(String
schemaStr, String invalidCharMask) {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+ Map<String, Object> objMap = objectMapper.readValue(schemaStr,
Map.class);
+ Map<String, Object> modifiedMap = transformMap(objMap, invalidCharMask);
+ return Option.of(new
Schema.Parser().parse(objectMapper.writeValueAsString(modifiedMap)));
+ } catch (Exception ex) {
+ return Option.empty();
+ }
+ }
+
+ /*
+ * We first rely on Avro to parse and then try to rename only for those
failed.
+ * This way we can improve our parsing capabilities without breaking
existing functionality.
+ * For example we don't yet support multiple named schemas defined in a file.
+ */
+ private static Schema parseAvroSchema(String schemaStr, boolean
sanitizeSchema, String invalidCharMask) {
+ try {
+ return new Schema.Parser().parse(schemaStr);
+ } catch (SchemaParseException spe) {
+ // if sanitizing is not enabled rethrow the exception.
+ if (!sanitizeSchema) {
+ throw spe;
+ }
+ // Rename avro fields and try parsing once again.
+ Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr,
invalidCharMask);
+ if (!parseResult.isPresent()) {
+ // throw original exception.
+ throw spe;
+ }
+ return parseResult.get();
+ }
+ }
+
+ private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem
fs, boolean sanitizeSchema, String invalidCharMask) {
+ String schemaStr;
+ FSDataInputStream in = null;
+ try {
Review Comment:
try w/ resource might reduce the num lines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]