nsivabalan commented on code in PR #8010:
URL: https://github.com/apache/hudi/pull/8010#discussion_r1113419832
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -720,10 +721,22 @@ public static Schema getNullSchema() {
* @return sanitized name
*/
public static String sanitizeName(String name) {
- if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
- name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES,
MASK_FOR_INVALID_CHARS_IN_NAMES);
+ return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);
+ }
+
+ /**
+ * Sanitizes Name according to Avro rule for names.
+ * Removes characters other than the ones mentioned in
https://avro.apache.org/docs/current/spec.html#names .
+ *
+ * @param name input name
+ * @param invalidCharMask replacement for invalid characters.
+ * @return sanitized name
+ */
+ public static String sanitizeName(String name, String invalidCharMask) {
+ if (INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN.matcher(name.substring(0,
1)).matches()) {
Review Comment:
shouldn't this be inverse?
if first char matches ^0-9 or if first char does not match ^A-Za-z, we
should replace. can you help me understand what am I missing here
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -124,6 +261,7 @@ public InputBatch<Dataset<Row>>
fetchNewDataInRowFormat(Option<String> lastCkptS
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PROTO: {
+ //sanitizing is not done, but could be implemented if needed
Review Comment:
we can do checkArg and throw exception si sanitization is enabled for proto
source.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
public final class SourceFormatAdapter implements Closeable {
private final Source source;
+ private final SourceFormatAdapterConfig config;
+
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
Review Comment:
lets add config annotation.
lets move this to a separate class and not embed here.
also, may be we should name this DeltaStreamerSourceConfigs.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -60,15 +178,20 @@ public SourceFormatAdapter(Source source) {
public InputBatch<JavaRDD<GenericRecord>>
fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
+ //don't need to sanitize because it's already avro
return ((Source<JavaRDD<GenericRecord>>)
source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
+ //sanitizing is done inside the convertor
Review Comment:
nit. end doc with " if enabled"
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -79,4 +92,80 @@ public Schema getTargetSchema() {
return super.getTargetSchema();
}
}
+
+ private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem
fs, boolean sanitizeSchema, String invalidCharMask) {
+ String schemaStr;
+ try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+ schemaStr = FileIOUtils.readAsUTFString(in);
+ } catch (IOException ioe) {
+ throw new HoodieIOException(String.format("Error reading schema from
file %s", schemaPath), ioe);
+ }
+ return parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+ }
+
+ /*
+ * We first rely on Avro to parse and then try to rename only for those
failed.
+ * This way we can improve our parsing capabilities without breaking
existing functionality.
+ * For example, we don't yet support multiple named schemas defined in a
file.
+ */
+ private static Schema parseAvroSchema(String schemaStr, boolean
sanitizeSchema, String invalidCharMask) {
Review Comment:
lets try to use the same variable name, argument name across the code base.
for eg, shouldSanitize, sanitizeSchema should be unified.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -79,4 +92,80 @@ public Schema getTargetSchema() {
return super.getTargetSchema();
}
}
+
+ private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem
fs, boolean sanitizeSchema, String invalidCharMask) {
+ String schemaStr;
+ try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+ schemaStr = FileIOUtils.readAsUTFString(in);
+ } catch (IOException ioe) {
+ throw new HoodieIOException(String.format("Error reading schema from
file %s", schemaPath), ioe);
+ }
+ return parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+ }
+
+ /*
+ * We first rely on Avro to parse and then try to rename only for those
failed.
+ * This way we can improve our parsing capabilities without breaking
existing functionality.
+ * For example, we don't yet support multiple named schemas defined in a
file.
+ */
+ private static Schema parseAvroSchema(String schemaStr, boolean
sanitizeSchema, String invalidCharMask) {
+ try {
+ return new Schema.Parser().parse(schemaStr);
+ } catch (SchemaParseException spe) {
+ // if sanitizing is not enabled rethrow the exception.
+ if (!sanitizeSchema) {
+ throw spe;
+ }
+ // Rename avro fields and try parsing once again.
+ Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr,
invalidCharMask);
+ if (!parseResult.isPresent()) {
+ // throw original exception.
+ throw spe;
+ }
+ return parseResult.get();
+ }
+ }
+
+ private static List<Object> transformList(List<Object> src, String
invalidCharMask) {
Review Comment:
java docs
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
public final class SourceFormatAdapter implements Closeable {
private final Source source;
+ private final SourceFormatAdapterConfig config;
+
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes names of invalid schema fields both in the data read from
source and also in the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<String> SANITIZE_SCHEMA_FIELD_NAMES =
ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+ .defaultValue("false")
+ .withDocumentation("Sanitizes invalid schema field names both in the
data and also in the schema");
+
+ public static final ConfigProperty<String>
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+ .defaultValue("__")
+ .withDocumentation("Character mask to be used as replacement for
invalid field names");
+
+ public SourceFormatAdapterConfig() {
+ super();
+ }
+
+ public SourceFormatAdapterConfig(TypedProperties props) {
+ super(props);
+ }
+ }
public SourceFormatAdapter(Source source) {
+ this(source, Option.empty());
+ }
+
+ public SourceFormatAdapter(Source source,
+ Option<TypedProperties> props) {
this.source = source;
+ this.config = props.isPresent() ? new
SourceFormatAdapterConfig((props.get())) : new SourceFormatAdapterConfig();
+ }
+
+ /**
+ * Config that automatically sanitizes the field names as per avro naming
rules.
+ * @return enabled status.
+ */
+ private boolean isNameSanitizingEnabled() {
+ return
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_SCHEMA_FIELD_NAMES);
+ }
+
+ /**
+ * Replacement mask for invalid characters encountered in avro names.
+ * @return sanitized value.
+ */
+ private String getInvalidCharMask() {
+ return
config.getStringOrDefault(SourceFormatAdapterConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK);
+ }
+
+ private static DataType sanitizeDataTypeForAvro(DataType dataType, String
invalidCharMask) {
+ if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ DataType sanitizedDataType =
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+ return new ArrayType(sanitizedDataType, arrayType.containsNull());
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ DataType sanitizedKeyDataType =
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+ DataType sanitizedValueDataType =
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+ return new MapType(sanitizedKeyDataType, sanitizedValueDataType,
mapType.valueContainsNull());
+ } else if (dataType instanceof StructType) {
+ return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+ }
+ return dataType;
+ }
+
+ // TODO(HUDI-5256): Refactor this to use InternalSchema when it is ready.
+ private static StructType sanitizeStructTypeForAvro(StructType structType,
String invalidCharMask) {
+ StructType sanitizedStructType = new StructType();
+ StructField[] structFields = structType.fields();
+ for (StructField s : structFields) {
+ DataType currFieldDataTypeSanitized =
sanitizeDataTypeForAvro(s.dataType(), invalidCharMask);
+ StructField structFieldCopy = new
StructField(HoodieAvroUtils.sanitizeName(s.name(), invalidCharMask),
+ currFieldDataTypeSanitized, s.nullable(), s.metadata());
+ sanitizedStructType = sanitizedStructType.add(structFieldCopy);
+ }
+ return sanitizedStructType;
+ }
+
+ private static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row>
inputDataset, String invalidCharMask) {
Review Comment:
lets move these static method to some Utils. SanitizeUtils or something. and
lets keep the SourceFormatAdaptor lean.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
public final class SourceFormatAdapter implements Closeable {
private final Source source;
+ private final SourceFormatAdapterConfig config;
+
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes names of invalid schema fields both in the data read from
source and also in the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<String> SANITIZE_SCHEMA_FIELD_NAMES =
ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+ .defaultValue("false")
+ .withDocumentation("Sanitizes invalid schema field names both in the
data and also in the schema");
+
+ public static final ConfigProperty<String>
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+ .defaultValue("__")
+ .withDocumentation("Character mask to be used as replacement for
invalid field names");
+
+ public SourceFormatAdapterConfig() {
+ super();
+ }
+
+ public SourceFormatAdapterConfig(TypedProperties props) {
+ super(props);
+ }
+ }
public SourceFormatAdapter(Source source) {
+ this(source, Option.empty());
+ }
+
+ public SourceFormatAdapter(Source source,
+ Option<TypedProperties> props) {
this.source = source;
+ this.config = props.isPresent() ? new
SourceFormatAdapterConfig((props.get())) : new SourceFormatAdapterConfig();
+ }
+
+ /**
+ * Config that automatically sanitizes the field names as per avro naming
rules.
+ * @return enabled status.
+ */
+ private boolean isNameSanitizingEnabled() {
Review Comment:
isFieldNameSanitizationEnabled
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -60,15 +178,20 @@ public SourceFormatAdapter(Source source) {
public InputBatch<JavaRDD<GenericRecord>>
fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
+ //don't need to sanitize because it's already avro
return ((Source<JavaRDD<GenericRecord>>)
source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
+ //sanitizing is done inside the convertor
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>)
source).fetchNext(lastCkptStr, sourceLimit);
- AvroConvertor convertor = new
AvroConvertor(r.getSchemaProvider().getSourceSchema());
+ AvroConvertor convertor = new
AvroConvertor(r.getSchemaProvider().getSourceSchema(),
isNameSanitizingEnabled(), getInvalidCharMask());
return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd ->
rdd.map(convertor::fromJson)).orElse(null)),
- r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ r.getCheckpointForNextBatch(),
+ r.getSchemaProvider());
}
case ROW: {
- InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit);
+ //we do the sanitizing here
Review Comment:
nit. end docs w/ "if enabled"
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -94,27 +218,40 @@ public InputBatch<JavaRDD<GenericRecord>>
fetchNewDataInAvroFormat(Option<String
}
}
+ private InputBatch<Dataset<Row>>
avroDataInRowFormat(InputBatch<JavaRDD<GenericRecord>> r) {
+ Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+ return new InputBatch<>(
+ Option
+ .ofNullable(
+ r.getBatch()
+ .map(rdd ->
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+ source.getSparkSession())
+ )
+ .orElse(null)),
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
+
/**
* Fetch new data in row format. If the source provides data in different
format, they are translated to Row format
*/
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String>
lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
- return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr,
sourceLimit);
+ //we do the sanitizing here
+ return trySanitizeFieldNames(((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit),
+ isNameSanitizingEnabled(), getInvalidCharMask());
case AVRO: {
+ //don't need to sanitize because it's already avro
InputBatch<JavaRDD<GenericRecord>> r =
((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
- Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
- return new InputBatch<>(
- Option
- .ofNullable(
- r.getBatch()
- .map(rdd ->
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
- source.getSparkSession())
- )
- .orElse(null)),
- r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ return avroDataInRowFormat(r);
}
case JSON: {
+ if (isNameSanitizingEnabled()) {
+ //leverage the json -> avro sanitizing, If more speed is needed,
then specific json -> row with sanitizing is possible
Review Comment:
can we file a follow up ticket on this.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -94,27 +218,40 @@ public InputBatch<JavaRDD<GenericRecord>>
fetchNewDataInAvroFormat(Option<String
}
}
+ private InputBatch<Dataset<Row>>
avroDataInRowFormat(InputBatch<JavaRDD<GenericRecord>> r) {
+ Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+ return new InputBatch<>(
+ Option
+ .ofNullable(
+ r.getBatch()
+ .map(rdd ->
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+ source.getSparkSession())
+ )
+ .orElse(null)),
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
+
/**
* Fetch new data in row format. If the source provides data in different
format, they are translated to Row format
*/
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String>
lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
- return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr,
sourceLimit);
+ //we do the sanitizing here
+ return trySanitizeFieldNames(((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit),
+ isNameSanitizingEnabled(), getInvalidCharMask());
case AVRO: {
+ //don't need to sanitize because it's already avro
InputBatch<JavaRDD<GenericRecord>> r =
((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
- Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
- return new InputBatch<>(
- Option
- .ofNullable(
- r.getBatch()
- .map(rdd ->
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
- source.getSparkSession())
- )
- .orElse(null)),
- r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ return avroDataInRowFormat(r);
}
case JSON: {
+ if (isNameSanitizingEnabled()) {
+ //leverage the json -> avro sanitizing, If more speed is needed,
then specific json -> row with sanitizing is possible
Review Comment:
nit. "if more speed is needed" -> "Its feasible to optimize this conversion
directly from json to avro. More details in HUDI-XXXX"
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -49,9 +57,119 @@
public final class SourceFormatAdapter implements Closeable {
private final Source source;
+ private final SourceFormatAdapterConfig config;
+
+ public static class SourceFormatAdapterConfig extends HoodieConfig {
+ // sanitizes names of invalid schema fields both in the data read from
source and also in the schema.
+ // invalid definition here goes by avro naming convention
(https://avro.apache.org/docs/current/spec.html#names).
+ public static final ConfigProperty<String> SANITIZE_SCHEMA_FIELD_NAMES =
ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+ .defaultValue("false")
+ .withDocumentation("Sanitizes invalid schema field names both in the
data and also in the schema");
+
+ public static final ConfigProperty<String>
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+ .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+ .defaultValue("__")
+ .withDocumentation("Character mask to be used as replacement for
invalid field names");
+
+ public SourceFormatAdapterConfig() {
+ super();
+ }
+
+ public SourceFormatAdapterConfig(TypedProperties props) {
+ super(props);
+ }
+ }
public SourceFormatAdapter(Source source) {
+ this(source, Option.empty());
+ }
+
+ public SourceFormatAdapter(Source source,
+ Option<TypedProperties> props) {
this.source = source;
+ this.config = props.isPresent() ? new
SourceFormatAdapterConfig((props.get())) : new SourceFormatAdapterConfig();
+ }
+
+ /**
+ * Config that automatically sanitizes the field names as per avro naming
rules.
+ * @return enabled status.
+ */
+ private boolean isNameSanitizingEnabled() {
+ return
config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_SCHEMA_FIELD_NAMES);
+ }
+
+ /**
+ * Replacement mask for invalid characters encountered in avro names.
+ * @return sanitized value.
+ */
+ private String getInvalidCharMask() {
+ return
config.getStringOrDefault(SourceFormatAdapterConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK);
+ }
+
+ private static DataType sanitizeDataTypeForAvro(DataType dataType, String
invalidCharMask) {
+ if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ DataType sanitizedDataType =
sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+ return new ArrayType(sanitizedDataType, arrayType.containsNull());
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ DataType sanitizedKeyDataType =
sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+ DataType sanitizedValueDataType =
sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+ return new MapType(sanitizedKeyDataType, sanitizedValueDataType,
mapType.valueContainsNull());
+ } else if (dataType instanceof StructType) {
+ return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+ }
+ return dataType;
+ }
+
+ // TODO(HUDI-5256): Refactor this to use InternalSchema when it is ready.
+ private static StructType sanitizeStructTypeForAvro(StructType structType,
String invalidCharMask) {
+ StructType sanitizedStructType = new StructType();
+ StructField[] structFields = structType.fields();
+ for (StructField s : structFields) {
+ DataType currFieldDataTypeSanitized =
sanitizeDataTypeForAvro(s.dataType(), invalidCharMask);
+ StructField structFieldCopy = new
StructField(HoodieAvroUtils.sanitizeName(s.name(), invalidCharMask),
+ currFieldDataTypeSanitized, s.nullable(), s.metadata());
+ sanitizedStructType = sanitizedStructType.add(structFieldCopy);
+ }
+ return sanitizedStructType;
+ }
+
+ private static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row>
inputDataset, String invalidCharMask) {
+ StructField[] inputFields = inputDataset.schema().fields();
+ Dataset<Row> targetDataset = inputDataset;
+ for (StructField sf : inputFields) {
+ DataType sanitizedFieldDataType = sanitizeDataTypeForAvro(sf.dataType(),
invalidCharMask);
+ if (!sanitizedFieldDataType.equals(sf.dataType())) {
+ // Sanitizing column names for nested types can be thought of as going
from one schema to another
+ // which are structurally similar except for actual column names
itself. So casting is safe and sufficient.
+ targetDataset = targetDataset.withColumn(sf.name(),
targetDataset.col(sf.name()).cast(sanitizedFieldDataType));
+ }
+ String possibleRename = HoodieAvroUtils.sanitizeName(sf.name(),
invalidCharMask);
+ if (!sf.name().equals(possibleRename)) {
+ targetDataset = targetDataset.withColumnRenamed(sf.name(),
possibleRename);
+ }
+ }
+ return targetDataset;
+ }
+
+ /**
+ * Sanitize all columns including nested ones as per Avro conventions.
+ * @param srcBatch
+ * @param shouldSanitize
+ * @param invalidCharMask
+ * @return sanitized batch.
+ */
+ private static InputBatch<Dataset<Row>>
trySanitizeFieldNames(InputBatch<Dataset<Row>> srcBatch,
Review Comment:
mayBeSanitizeFeildNames
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -79,4 +92,80 @@ public Schema getTargetSchema() {
return super.getTargetSchema();
}
}
+
+ private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem
fs, boolean sanitizeSchema, String invalidCharMask) {
+ String schemaStr;
+ try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+ schemaStr = FileIOUtils.readAsUTFString(in);
+ } catch (IOException ioe) {
+ throw new HoodieIOException(String.format("Error reading schema from
file %s", schemaPath), ioe);
+ }
+ return parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+ }
+
+ /*
+ * We first rely on Avro to parse and then try to rename only for those
failed.
+ * This way we can improve our parsing capabilities without breaking
existing functionality.
+ * For example, we don't yet support multiple named schemas defined in a
file.
+ */
+ private static Schema parseAvroSchema(String schemaStr, boolean
sanitizeSchema, String invalidCharMask) {
+ try {
+ return new Schema.Parser().parse(schemaStr);
+ } catch (SchemaParseException spe) {
+ // if sanitizing is not enabled rethrow the exception.
+ if (!sanitizeSchema) {
+ throw spe;
+ }
+ // Rename avro fields and try parsing once again.
+ Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr,
invalidCharMask);
+ if (!parseResult.isPresent()) {
+ // throw original exception.
+ throw spe;
+ }
+ return parseResult.get();
+ }
+ }
+
+ private static List<Object> transformList(List<Object> src, String
invalidCharMask) {
+ return src.stream().map(obj -> {
+ if (obj instanceof List) {
+ return transformList((List<Object>) obj, invalidCharMask);
+ } else if (obj instanceof Map) {
+ return transformMap((Map<String, Object>) obj, invalidCharMask);
+ } else {
+ return obj;
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private static Map<String, Object> transformMap(Map<String, Object> src,
String invalidCharMask) {
Review Comment:
I will leave it to take a call. lets see if it makes sense to move these
also to SanitizationUtils
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]