yihua commented on code in PR #11265:
URL: https://github.com/apache/hudi/pull/11265#discussion_r1690295012
##########
hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java:
##########
@@ -187,196 +180,772 @@ private static Object convertJsonToAvroField(Object
value, String name, Schema s
throw new HoodieJsonToAvroConversionException(null, name, schema,
shouldSanitize, invalidCharMask);
}
- JsonToAvroFieldProcessor processor =
FIELD_TYPE_PROCESSORS.get(schema.getType());
- if (null != processor) {
- return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
- }
- throw new IllegalArgumentException("JsonConverter cannot handle type: " +
schema.getType());
+ return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema,
shouldSanitize, invalidCharMask);
}
- /**
- * Base Class for converting json to avro fields.
- */
- private abstract static class JsonToAvroFieldProcessor implements
Serializable {
+ private static class JsonToAvroFieldProcessorUtil {
+ /**
+ * Base Class for converting json to avro fields.
+ */
+ private abstract static class JsonToAvroFieldProcessor implements
Serializable {
- public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
- Pair<Boolean, Object> res = convert(value, name, schema, shouldSanitize,
invalidCharMask);
- if (!res.getLeft()) {
- throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
+ Pair<Boolean, Object> res = convert(value, name, schema,
shouldSanitize, invalidCharMask);
+ if (!res.getLeft()) {
+ throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ }
+ return res.getRight();
}
- return res.getRight();
+
+ protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
}
- protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
- }
+ public static Object convertToAvro(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
+ return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ private static JsonToAvroFieldProcessor getProcessorForSchema(Schema
schema) {
+ JsonToAvroFieldProcessor processor = null;
+
+ // 3 cases to consider: customized logicalType, logicalType, and type.
+ String customizedLogicalType = schema.getProp("logicalType");
+ LogicalType logicalType = schema.getLogicalType();
+ Type type = schema.getType();
+ if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
+ } else if (logicalType != null) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
+ } else {
+ processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
+ }
+
+ ValidationUtils.checkArgument(
+ processor != null, String.format("JsonConverter cannot handle type:
%s", type));
+ return processor;
+ }
+
+ // Avro primitive and complex type processors.
+ private static final Map<Schema.Type, JsonToAvroFieldProcessor>
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
+ // Avro logical type processors.
+ private static final Map<String, JsonToAvroFieldProcessor>
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+
+ /**
+ * Build type processor map for each avro type.
+ */
+ private static Map<Schema.Type, JsonToAvroFieldProcessor>
getFieldTypeProcessors() {
+ Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
+ fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+ fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+ fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+ fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+ fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+ fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+ fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+ fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ return Collections.unmodifiableMap(fieldTypeProcessors);
+ }
- private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static Map<String, JsonToAvroFieldProcessor>
getLogicalFieldTypeProcessors() {
+ return CollectionUtils.createImmutableMap(
+ Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new
DecimalLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new
TimeMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new
TimeMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new
DateLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new
LocalTimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new
LocalTimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new
TimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new
TimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new
DurationLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
+ }
+
+ private static class DecimalLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Boolean) {
- return Pair.of(true, value);
+
+ if (!isValidDecimalTypeConfig(schema)) {
+ return Pair.of(false, null);
+ }
+
+ // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
+ // parsing it to Fixed avro type.
+ if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
+ return processor.convert(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ // Case 2: Input is a number or String number.
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+ if (Boolean.FALSE.equals(parseResult.getLeft())) {
+ return Pair.of(false, null);
+ }
+ BigDecimal bigDecimal = parseResult.getRight();
+
+ // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
+ // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
+ // Allowed: 123.45, 123, 0.12
+ // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
+ // 123456, 0.12345
+ if (bigDecimal.scale() > decimalType.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ // Correspond to case
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 5 as scale 2 without rounding.
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 3 as scale 2 without rounding
+ return Pair.of(false, null);
+ }
+
+ switch (schema.getType()) {
+ case BYTES:
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+ return Pair.of(true, byteBuffer);
+ case FIXED:
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+ return Pair.of(true, fixedValue);
+ default: {
+ return Pair.of(false, null);
+ }
}
- return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateIntTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).intValue());
- } else if (value instanceof String) {
- return Pair.of(true, Integer.valueOf((String) value));
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDecimalTypeConfig(Schema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ // At the time when the schema is found not valid when it is parsed,
the Avro Schema.parse will just silently
+ // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
+ if (decimalType == null) {
+ return false;
}
- return Pair.of(false, null);
+ // Even though schema is validated at schema parsing phase, still
validate here to be defensive.
+ decimalType.validate(schema);
+ return true;
}
- };
- }
- private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).doubleValue());
- } else if (value instanceof String) {
- return Pair.of(true, Double.valueOf((String) value));
+ /**
+ * Parse the object to BigDecimal.
+ *
+ * @param obj Object to be parsed
+ * @return Pair object, with left as boolean indicating if the parsing
was successful and right as the
+ * BigDecimal value.
+ */
+ private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
+ // Case 1: Object is a number.
+ if (obj instanceof Number) {
+ return Pair.of(true, BigDecimal.valueOf(((Number)
obj).doubleValue()));
+ }
+
+ // Case 2: Object is a number in String format.
+ if (obj instanceof String) {
+ BigDecimal bigDecimal = null;
+ try {
+ bigDecimal = new BigDecimal(((String) obj));
+ } catch (java.lang.NumberFormatException ignored) {
+ /* ignore */
+ }
+ return Pair.of(bigDecimal != null, bigDecimal);
}
return Pair.of(false, null);
}
- };
- }
+ }
- private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static class DurationLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
+ private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+ /**
+ * We expect the input to be a list of 3 integers representing months,
days and milliseconds.
+ */
+ private boolean isValidDurationInput(Object value) {
+ if (!(value instanceof List<?>)) {
+ return false;
+ }
+ List<?> list = (List<?>) value;
+ if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+ return false;
+ }
+ for (Object element : list) {
+ if (!(element instanceof Integer)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Convert the given object to Avro object with schema whose logical
type is duration.
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).floatValue());
- } else if (value instanceof String) {
- return Pair.of(true, Float.valueOf((String) value));
+
+ if (!isValidDurationTypeConfig(schema)) {
+ return Pair.of(false, null);
}
- return Pair.of(false, null);
+ if (!isValidDurationInput(value)) {
+ return Pair.of(false, null);
+ }
+ // After the validation the input can be safely cast to List<Integer>
with 3 elements.
+ List<?> list = (List<?>) value;
+ List<Integer> converval = list.stream()
+ .filter(Integer.class::isInstance)
+ .map(Integer.class::cast)
+ .collect(Collectors.toList());
+
+ ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+ for (Integer element : converval) {
+ buffer.putInt(element); // months
+ }
+ return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ }
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDurationTypeConfig(Schema schema) {
+ String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+ LogicalType durationType = schema.getLogicalType();
+ String durationTypeProp = schema.getProp("logicalType");
+ // 1. The Avro type should be "Fixed".
+ // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+ // 3. Logical type name should be "duration". The name might be stored
in different places based on Avro version
+ // being used here.
+ return schema.getType().equals(Type.FIXED)
+ && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
+ && (durationType != null &&
durationType.getName().equals(durationTypeName)
+ || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ }
+ }
+
+ /**
+ * Processor utility handling Number inputs. Consumed by
TimeLogicalTypeProcessor.
+ */
+ private interface NumericParser {
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ Pair<Boolean, Object> handleNumberValue(Number value);
+
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ // @param value the input number in string format.
+ Pair<Boolean, Object> handleStringNumber(String value);
+
+ interface IntParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.intValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Integer.parseInt(value));
+ }
+ }
+
+ interface LongParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.longValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Long.parseLong(value));
+ }
+ }
+ }
+
+ /**
+ * Base Class for converting object to avro logical type
TimeMilli/TimeMicro.
+ */
+ private abstract static class TimeLogicalTypeProcessor extends
JsonToAvroFieldProcessor implements NumericParser {
+
+ protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+ // Logical type the processor is handling.
+ private final AvroLogicalTypeEnum logicalTypeEnum;
+
+ public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+ this.logicalTypeEnum = logicalTypeEnum;
}
- };
- }
- private static JsonToAvroFieldProcessor generateLongTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ /**
+ * Main function that convert input to Object with java data type
specified by schema
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == null) {
+ return Pair.of(false, null);
+ }
+ logicalType.validate(schema);
if (value instanceof Number) {
- return Pair.of(true, ((Number) value).longValue());
- } else if (value instanceof String) {
- return Pair.of(true, Long.valueOf((String) value));
+ return handleNumberValue((Number) value);
+ }
+ if (value instanceof String) {
+ String valStr = (String) value;
+ if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+ return handleStringNumber(valStr);
+ } else if (isWellFormedDateTime(valStr)) {
+ return handleStringValue(valStr);
+ }
}
return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateStringTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ // Handle the case when the input is a string that may be parsed as a
time.
+ protected abstract Pair<Boolean, Object> handleStringValue(String value);
+
+ protected DateTimeFormatter getDateTimeFormatter() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimeFormatter;
+ }
+
+ protected Pattern getDateTimePattern() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimePattern;
+ }
+
+ // Depending on the logical type the processor handles, they use
different parsing context
+ // when they need to parse a timestamp string in handleStringValue.
+ private static class DateTimeParseContext {
+ public DateTimeParseContext(DateTimeFormatter dateTimeFormatter,
Pattern dateTimePattern) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.dateTimePattern = dateTimePattern;
+ }
+
+ public final Pattern dateTimePattern;
+
+ public final DateTimeFormatter dateTimeFormatter;
+ }
+
+ private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+ private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
+ // Formatter for parsing local timestamp. It ignores the time zone
info of the timestamp.
Review Comment:
```suggestion
// Formatter for parsing a timestamp. It assumes UTC timezone, with
'Z' as the zone ID.
```
##########
hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java:
##########
@@ -55,6 +70,649 @@ public void basicConversion() throws IOException {
Assertions.assertEquals(rec, CONVERTER.convert(json, simpleSchema));
}
+ private static final String DECIMAL_AVRO_FILE_INVALID_PATH =
"/decimal-logical-type-invalid.avsc";
+ private static final String DECIMAL_AVRO_FILE_PATH =
"/decimal-logical-type.avsc";
+ private static final String DECIMAL_FIXED_AVRO_FILE_PATH =
"/decimal-logical-type-fixed-type.avsc";
+ /**
+ * Covered case:
+ * Avro Logical Type: Decimal
+ * Exhaustive unsupported input coverage.
+ */
+ @ParameterizedTest
+ @MethodSource("decimalBadCases")
+ void decimalLogicalTypeInvalidCaseTest(String avroFile, String strInput,
Double numInput,
+ boolean testFixedByteArray) throws
IOException {
+ Schema schema = SchemaTestUtil.getSchemaFromResourceFilePath(avroFile);
+
+ Map<String, Object> data = new HashMap<>();
+ if (strInput != null) {
+ data.put("decimalField", strInput);
+ } else if (numInput != null) {
+ data.put("decimalField", numInput);
+ } else if (testFixedByteArray) {
+ // Convert the fixed value to int array, which is used as json value
literals.
+ int[] intArray = {0, 0, 48, 57};
+ data.put("decimalField", intArray);
+ }
+ String json = MAPPER.writeValueAsString(data);
+
+ // Schedule with timestamp same as that of committed instant
+
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ CONVERTER.convert(json, schema);
+ });
+ }
+
+ static Stream<Object> decimalBadCases() {
+ return Stream.of(
+ // Invalid schema definition.
+ Arguments.of(DECIMAL_AVRO_FILE_INVALID_PATH, "123.45", null, false),
+ // Schema set precision as 5, input overwhelmed the precision.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "123333.45", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 123333.45, false),
+ // Schema precision set to 5, scale set to 2, so there is only 3 digit
to accommodate integer part.
+ // As we do not do rounding, any input with more than 3 digit integer
would fail.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "1233", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 1233D, false),
+ // Schema set scale as 2, input overwhelmed the scale.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "0.222", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 0.222, false),
+ // Invalid string which cannot be parsed as number.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "NotAValidString", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "-", null, false),
+ // Schema requires byte type while input is fixed type raw data.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, null, null, true)
+ );
+ }
+
+ /**
+ * Covered case:
+ * Avro Logical Type: Decimal
+ * Avro type: bytes, fixed
+ * Input: Check test parameter
+ * Output: Object using Byte data type as the schema specified.
+ * */
+ @ParameterizedTest
+ @MethodSource("decimalGoodCases")
+ void decimalLogicalTypeTest(String avroFilePath, String groundTruth, String
strInput,
+ Double numInput, boolean testFixedByteArray)
throws IOException {
+ BigDecimal bigDecimal = new BigDecimal(groundTruth);
+ Map<String, Object> data = new HashMap<>();
+
+ Schema schema = SchemaTestUtil.getSchemaFromResourceFilePath(avroFilePath);
+ GenericRecord record = new GenericData.Record(schema);
+ Conversions.DecimalConversion conv = new Conversions.DecimalConversion();
+ Schema decimalFieldSchema = schema.getField("decimalField").schema();
+
+ // Decide the decimal field input according to the test dimension.
+ if (strInput != null) {
+ data.put("decimalField", strInput); // String number input
+ } else if (numInput != null) {
+ data.put("decimalField", numInput); // Number input
+ } else if (testFixedByteArray) {
+ // Fixed byte array input.
+ // Example: 123.45 - byte array [0, 0, 48, 57].
+ Schema fieldSchema = schema.getField("decimalField").schema();
+ GenericFixed fixedValue = new Conversions.DecimalConversion().toFixed(
+ bigDecimal, fieldSchema, fieldSchema.getLogicalType());
+ // Convert the fixed value to int array, which is used as json value
literals.
+ byte[] byteArray = fixedValue.bytes();
+ int[] intArray = new int[byteArray.length];
+ for (int i = 0; i < byteArray.length; i++) {
+ // Byte is signed in Java, int is 32-bit. Convert by & 0xFF to handle
negative values correctly.
+ intArray[i] = byteArray[i] & 0xFF;
+ }
+ data.put("decimalField", intArray);
+ }
+
+ // Decide the decimal field expected output according to the test
dimension.
+ if (avroFilePath.equals(DECIMAL_AVRO_FILE_PATH)) {
+ record.put("decimalField", conv.toBytes(bigDecimal, decimalFieldSchema,
decimalFieldSchema.getLogicalType()));
+ } else {
+ record.put("decimalField", conv.toFixed(bigDecimal, decimalFieldSchema,
decimalFieldSchema.getLogicalType()));
+ }
+
+ String json = MAPPER.writeValueAsString(data);
+
+ GenericRecord real = CONVERTER.convert(json, schema);
+ Assertions.assertEquals(record, real);
+ }
+
+ static Stream<Object> decimalGoodCases() {
+ return Stream.of(
+ // The schema all set precision as 5, scale as 2.
+ // Test dimension: Schema file, Ground truth, string input, number
input, fixed byte array input.
+ // Test some random numbers.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "123.45", "123.45", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "123.45", null, 123.45, false),
+ // Test MIN/MAX allowed by the schema.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "-999.99", "-999.99", null,
false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "999.99",null, 999.99, false),
+ // Test 0.
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", null, 0D, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", "0", null, false),
+ Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", "000.00", null, false),
+ // Same set of coverage over schame using byte/fixed type.
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", "123.45", null,
false),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, 123.45,
false),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "-999.99", "-999.99", null,
false),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "999.99",null, 999.99,
false),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, 0D, false),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", "0", null, true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", "000.00", null, true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, null, true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, 123.45,
true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "-999.99", null, null,
true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "999.99", null, 999.99,
true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, null, true),
+ Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, null, true),
Review Comment:
Remove duplicate entries.
##########
hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java:
##########
@@ -187,196 +180,772 @@ private static Object convertJsonToAvroField(Object
value, String name, Schema s
throw new HoodieJsonToAvroConversionException(null, name, schema,
shouldSanitize, invalidCharMask);
}
- JsonToAvroFieldProcessor processor =
FIELD_TYPE_PROCESSORS.get(schema.getType());
- if (null != processor) {
- return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
- }
- throw new IllegalArgumentException("JsonConverter cannot handle type: " +
schema.getType());
+ return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema,
shouldSanitize, invalidCharMask);
}
- /**
- * Base Class for converting json to avro fields.
- */
- private abstract static class JsonToAvroFieldProcessor implements
Serializable {
+ private static class JsonToAvroFieldProcessorUtil {
+ /**
+ * Base Class for converting json to avro fields.
+ */
+ private abstract static class JsonToAvroFieldProcessor implements
Serializable {
- public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
- Pair<Boolean, Object> res = convert(value, name, schema, shouldSanitize,
invalidCharMask);
- if (!res.getLeft()) {
- throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
+ Pair<Boolean, Object> res = convert(value, name, schema,
shouldSanitize, invalidCharMask);
+ if (!res.getLeft()) {
+ throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ }
+ return res.getRight();
}
- return res.getRight();
+
+ protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
}
- protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
- }
+ public static Object convertToAvro(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
+ return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ private static JsonToAvroFieldProcessor getProcessorForSchema(Schema
schema) {
+ JsonToAvroFieldProcessor processor = null;
+
+ // 3 cases to consider: customized logicalType, logicalType, and type.
+ String customizedLogicalType = schema.getProp("logicalType");
+ LogicalType logicalType = schema.getLogicalType();
+ Type type = schema.getType();
+ if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
+ } else if (logicalType != null) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
+ } else {
+ processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
+ }
+
+ ValidationUtils.checkArgument(
+ processor != null, String.format("JsonConverter cannot handle type:
%s", type));
+ return processor;
+ }
+
+ // Avro primitive and complex type processors.
+ private static final Map<Schema.Type, JsonToAvroFieldProcessor>
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
+ // Avro logical type processors.
+ private static final Map<String, JsonToAvroFieldProcessor>
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+
+ /**
+ * Build type processor map for each avro type.
+ */
+ private static Map<Schema.Type, JsonToAvroFieldProcessor>
getFieldTypeProcessors() {
+ Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
+ fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+ fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+ fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+ fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+ fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+ fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+ fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+ fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ return Collections.unmodifiableMap(fieldTypeProcessors);
+ }
- private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static Map<String, JsonToAvroFieldProcessor>
getLogicalFieldTypeProcessors() {
+ return CollectionUtils.createImmutableMap(
+ Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new
DecimalLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new
TimeMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new
TimeMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new
DateLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new
LocalTimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new
LocalTimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new
TimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new
TimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new
DurationLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
+ }
+
+ private static class DecimalLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Boolean) {
- return Pair.of(true, value);
+
+ if (!isValidDecimalTypeConfig(schema)) {
+ return Pair.of(false, null);
+ }
+
+ // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
+ // parsing it to Fixed avro type.
+ if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
+ return processor.convert(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ // Case 2: Input is a number or String number.
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+ if (Boolean.FALSE.equals(parseResult.getLeft())) {
+ return Pair.of(false, null);
+ }
+ BigDecimal bigDecimal = parseResult.getRight();
+
+ // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
+ // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
+ // Allowed: 123.45, 123, 0.12
+ // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
+ // 123456, 0.12345
+ if (bigDecimal.scale() > decimalType.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ // Correspond to case
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 5 as scale 2 without rounding.
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 3 as scale 2 without rounding
+ return Pair.of(false, null);
+ }
+
+ switch (schema.getType()) {
+ case BYTES:
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+ return Pair.of(true, byteBuffer);
+ case FIXED:
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+ return Pair.of(true, fixedValue);
+ default: {
+ return Pair.of(false, null);
+ }
}
- return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateIntTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).intValue());
- } else if (value instanceof String) {
- return Pair.of(true, Integer.valueOf((String) value));
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDecimalTypeConfig(Schema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ // At the time when the schema is found not valid when it is parsed,
the Avro Schema.parse will just silently
+ // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
+ if (decimalType == null) {
+ return false;
}
- return Pair.of(false, null);
+ // Even though schema is validated at schema parsing phase, still
validate here to be defensive.
+ decimalType.validate(schema);
+ return true;
}
- };
- }
- private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).doubleValue());
- } else if (value instanceof String) {
- return Pair.of(true, Double.valueOf((String) value));
+ /**
+ * Parse the object to BigDecimal.
+ *
+ * @param obj Object to be parsed
+ * @return Pair object, with left as boolean indicating if the parsing
was successful and right as the
+ * BigDecimal value.
+ */
+ private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
+ // Case 1: Object is a number.
+ if (obj instanceof Number) {
+ return Pair.of(true, BigDecimal.valueOf(((Number)
obj).doubleValue()));
+ }
+
+ // Case 2: Object is a number in String format.
+ if (obj instanceof String) {
+ BigDecimal bigDecimal = null;
+ try {
+ bigDecimal = new BigDecimal(((String) obj));
+ } catch (java.lang.NumberFormatException ignored) {
+ /* ignore */
+ }
+ return Pair.of(bigDecimal != null, bigDecimal);
}
return Pair.of(false, null);
}
- };
- }
+ }
- private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static class DurationLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
+ private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+ /**
+ * We expect the input to be a list of 3 integers representing months,
days and milliseconds.
+ */
+ private boolean isValidDurationInput(Object value) {
+ if (!(value instanceof List<?>)) {
+ return false;
+ }
+ List<?> list = (List<?>) value;
+ if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+ return false;
+ }
+ for (Object element : list) {
+ if (!(element instanceof Integer)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Convert the given object to Avro object with schema whose logical
type is duration.
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).floatValue());
- } else if (value instanceof String) {
- return Pair.of(true, Float.valueOf((String) value));
+
+ if (!isValidDurationTypeConfig(schema)) {
+ return Pair.of(false, null);
}
- return Pair.of(false, null);
+ if (!isValidDurationInput(value)) {
+ return Pair.of(false, null);
+ }
+ // After the validation the input can be safely cast to List<Integer>
with 3 elements.
+ List<?> list = (List<?>) value;
+ List<Integer> converval = list.stream()
+ .filter(Integer.class::isInstance)
+ .map(Integer.class::cast)
+ .collect(Collectors.toList());
+
+ ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+ for (Integer element : converval) {
+ buffer.putInt(element); // months
+ }
+ return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ }
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDurationTypeConfig(Schema schema) {
+ String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+ LogicalType durationType = schema.getLogicalType();
+ String durationTypeProp = schema.getProp("logicalType");
+ // 1. The Avro type should be "Fixed".
+ // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+ // 3. Logical type name should be "duration". The name might be stored
in different places based on Avro version
+ // being used here.
+ return schema.getType().equals(Type.FIXED)
+ && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
+ && (durationType != null &&
durationType.getName().equals(durationTypeName)
+ || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ }
+ }
+
+ /**
+ * Processor utility handling Number inputs. Consumed by
TimeLogicalTypeProcessor.
+ */
+ private interface NumericParser {
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ Pair<Boolean, Object> handleNumberValue(Number value);
+
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ // @param value the input number in string format.
+ Pair<Boolean, Object> handleStringNumber(String value);
+
+ interface IntParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.intValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Integer.parseInt(value));
+ }
+ }
+
+ interface LongParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.longValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Long.parseLong(value));
+ }
+ }
+ }
+
+ /**
+ * Base Class for converting object to avro logical type
TimeMilli/TimeMicro.
+ */
+ private abstract static class TimeLogicalTypeProcessor extends
JsonToAvroFieldProcessor implements NumericParser {
+
+ protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+ // Logical type the processor is handling.
+ private final AvroLogicalTypeEnum logicalTypeEnum;
+
+ public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+ this.logicalTypeEnum = logicalTypeEnum;
}
- };
- }
- private static JsonToAvroFieldProcessor generateLongTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ /**
+ * Main function that convert input to Object with java data type
specified by schema
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == null) {
+ return Pair.of(false, null);
+ }
+ logicalType.validate(schema);
if (value instanceof Number) {
- return Pair.of(true, ((Number) value).longValue());
- } else if (value instanceof String) {
- return Pair.of(true, Long.valueOf((String) value));
+ return handleNumberValue((Number) value);
+ }
+ if (value instanceof String) {
+ String valStr = (String) value;
+ if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+ return handleStringNumber(valStr);
+ } else if (isWellFormedDateTime(valStr)) {
+ return handleStringValue(valStr);
+ }
}
return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateStringTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ // Handle the case when the input is a string that may be parsed as a
time.
+ protected abstract Pair<Boolean, Object> handleStringValue(String value);
+
+ protected DateTimeFormatter getDateTimeFormatter() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimeFormatter;
+ }
+
+ protected Pattern getDateTimePattern() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimePattern;
+ }
+
+ // Depending on the logical type the processor handles, they use
different parsing context
+ // when they need to parse a timestamp string in handleStringValue.
+ private static class DateTimeParseContext {
+ public DateTimeParseContext(DateTimeFormatter dateTimeFormatter,
Pattern dateTimePattern) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.dateTimePattern = dateTimePattern;
+ }
+
+ public final Pattern dateTimePattern;
+
+ public final DateTimeFormatter dateTimeFormatter;
+ }
+
+ private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+ private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
+ // Formatter for parsing local timestamp. It ignores the time zone
info of the timestamp.
+ // No pattern is defined as ISO_INSTANT format internal is not clear.
+ DateTimeParseContext dateTimestampParseContext = new
DateTimeParseContext(
+ DateTimeFormatter.ISO_INSTANT,
+ null /* match everything*/);
+ // Formatter for parsing timestamp with time zone. The pattern is
derived from ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
Review Comment:
```suggestion
// Formatter for parsing a time of day. The pattern is derived from
ISO_LOCAL_TIME definition.
// Pattern asserts the string is
// <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
```
##########
hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java:
##########
@@ -187,196 +180,772 @@ private static Object convertJsonToAvroField(Object
value, String name, Schema s
throw new HoodieJsonToAvroConversionException(null, name, schema,
shouldSanitize, invalidCharMask);
}
- JsonToAvroFieldProcessor processor =
FIELD_TYPE_PROCESSORS.get(schema.getType());
- if (null != processor) {
- return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
- }
- throw new IllegalArgumentException("JsonConverter cannot handle type: " +
schema.getType());
+ return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema,
shouldSanitize, invalidCharMask);
}
- /**
- * Base Class for converting json to avro fields.
- */
- private abstract static class JsonToAvroFieldProcessor implements
Serializable {
+ private static class JsonToAvroFieldProcessorUtil {
+ /**
+ * Base Class for converting json to avro fields.
+ */
+ private abstract static class JsonToAvroFieldProcessor implements
Serializable {
- public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
- Pair<Boolean, Object> res = convert(value, name, schema, shouldSanitize,
invalidCharMask);
- if (!res.getLeft()) {
- throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
+ Pair<Boolean, Object> res = convert(value, name, schema,
shouldSanitize, invalidCharMask);
+ if (!res.getLeft()) {
+ throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ }
+ return res.getRight();
}
- return res.getRight();
+
+ protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
}
- protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
- }
+ public static Object convertToAvro(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
+ return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ private static JsonToAvroFieldProcessor getProcessorForSchema(Schema
schema) {
+ JsonToAvroFieldProcessor processor = null;
+
+ // 3 cases to consider: customized logicalType, logicalType, and type.
+ String customizedLogicalType = schema.getProp("logicalType");
+ LogicalType logicalType = schema.getLogicalType();
+ Type type = schema.getType();
+ if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
+ } else if (logicalType != null) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
+ } else {
+ processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
+ }
+
+ ValidationUtils.checkArgument(
+ processor != null, String.format("JsonConverter cannot handle type:
%s", type));
+ return processor;
+ }
+
+ // Avro primitive and complex type processors.
+ private static final Map<Schema.Type, JsonToAvroFieldProcessor>
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
+ // Avro logical type processors.
+ private static final Map<String, JsonToAvroFieldProcessor>
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+
+ /**
+ * Build type processor map for each avro type.
+ */
+ private static Map<Schema.Type, JsonToAvroFieldProcessor>
getFieldTypeProcessors() {
+ Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
+ fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+ fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+ fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+ fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+ fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+ fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+ fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+ fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ return Collections.unmodifiableMap(fieldTypeProcessors);
+ }
- private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static Map<String, JsonToAvroFieldProcessor>
getLogicalFieldTypeProcessors() {
+ return CollectionUtils.createImmutableMap(
+ Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new
DecimalLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new
TimeMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new
TimeMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new
DateLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new
LocalTimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new
LocalTimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new
TimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new
TimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new
DurationLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
+ }
+
+ private static class DecimalLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Boolean) {
- return Pair.of(true, value);
+
+ if (!isValidDecimalTypeConfig(schema)) {
+ return Pair.of(false, null);
+ }
+
+ // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
+ // parsing it to Fixed avro type.
+ if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
+ return processor.convert(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ // Case 2: Input is a number or String number.
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+ if (Boolean.FALSE.equals(parseResult.getLeft())) {
+ return Pair.of(false, null);
+ }
+ BigDecimal bigDecimal = parseResult.getRight();
+
+ // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
+ // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
+ // Allowed: 123.45, 123, 0.12
+ // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
+ // 123456, 0.12345
+ if (bigDecimal.scale() > decimalType.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ // Correspond to case
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 5 as scale 2 without rounding.
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 3 as scale 2 without rounding
+ return Pair.of(false, null);
+ }
+
+ switch (schema.getType()) {
+ case BYTES:
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+ return Pair.of(true, byteBuffer);
+ case FIXED:
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+ return Pair.of(true, fixedValue);
+ default: {
+ return Pair.of(false, null);
+ }
}
- return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateIntTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).intValue());
- } else if (value instanceof String) {
- return Pair.of(true, Integer.valueOf((String) value));
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDecimalTypeConfig(Schema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ // At the time when the schema is found not valid when it is parsed,
the Avro Schema.parse will just silently
+ // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
+ if (decimalType == null) {
+ return false;
}
- return Pair.of(false, null);
+ // Even though schema is validated at schema parsing phase, still
validate here to be defensive.
+ decimalType.validate(schema);
+ return true;
}
- };
- }
- private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).doubleValue());
- } else if (value instanceof String) {
- return Pair.of(true, Double.valueOf((String) value));
+ /**
+ * Parse the object to BigDecimal.
+ *
+ * @param obj Object to be parsed
+ * @return Pair object, with left as boolean indicating if the parsing
was successful and right as the
+ * BigDecimal value.
+ */
+ private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
+ // Case 1: Object is a number.
+ if (obj instanceof Number) {
+ return Pair.of(true, BigDecimal.valueOf(((Number)
obj).doubleValue()));
+ }
+
+ // Case 2: Object is a number in String format.
+ if (obj instanceof String) {
+ BigDecimal bigDecimal = null;
+ try {
+ bigDecimal = new BigDecimal(((String) obj));
+ } catch (java.lang.NumberFormatException ignored) {
+ /* ignore */
+ }
+ return Pair.of(bigDecimal != null, bigDecimal);
}
return Pair.of(false, null);
}
- };
- }
+ }
- private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static class DurationLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
+ private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+ /**
+ * We expect the input to be a list of 3 integers representing months,
days and milliseconds.
+ */
+ private boolean isValidDurationInput(Object value) {
+ if (!(value instanceof List<?>)) {
+ return false;
+ }
+ List<?> list = (List<?>) value;
+ if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+ return false;
+ }
+ for (Object element : list) {
+ if (!(element instanceof Integer)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Convert the given object to Avro object with schema whose logical
type is duration.
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).floatValue());
- } else if (value instanceof String) {
- return Pair.of(true, Float.valueOf((String) value));
+
+ if (!isValidDurationTypeConfig(schema)) {
+ return Pair.of(false, null);
}
- return Pair.of(false, null);
+ if (!isValidDurationInput(value)) {
+ return Pair.of(false, null);
+ }
+ // After the validation the input can be safely cast to List<Integer>
with 3 elements.
+ List<?> list = (List<?>) value;
+ List<Integer> converval = list.stream()
+ .filter(Integer.class::isInstance)
+ .map(Integer.class::cast)
+ .collect(Collectors.toList());
+
+ ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+ for (Integer element : converval) {
+ buffer.putInt(element); // months
+ }
+ return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ }
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDurationTypeConfig(Schema schema) {
+ String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+ LogicalType durationType = schema.getLogicalType();
+ String durationTypeProp = schema.getProp("logicalType");
+ // 1. The Avro type should be "Fixed".
+ // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+ // 3. Logical type name should be "duration". The name might be stored
in different places based on Avro version
+ // being used here.
+ return schema.getType().equals(Type.FIXED)
+ && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
+ && (durationType != null &&
durationType.getName().equals(durationTypeName)
+ || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ }
+ }
+
+ /**
+ * Processor utility handling Number inputs. Consumed by
TimeLogicalTypeProcessor.
+ */
+ private interface NumericParser {
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ Pair<Boolean, Object> handleNumberValue(Number value);
+
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ // @param value the input number in string format.
+ Pair<Boolean, Object> handleStringNumber(String value);
+
+ interface IntParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.intValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Integer.parseInt(value));
+ }
+ }
+
+ interface LongParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.longValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Long.parseLong(value));
+ }
+ }
+ }
+
+ /**
+ * Base Class for converting object to avro logical type
TimeMilli/TimeMicro.
+ */
+ private abstract static class TimeLogicalTypeProcessor extends
JsonToAvroFieldProcessor implements NumericParser {
+
+ protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+ // Logical type the processor is handling.
+ private final AvroLogicalTypeEnum logicalTypeEnum;
+
+ public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+ this.logicalTypeEnum = logicalTypeEnum;
}
- };
- }
- private static JsonToAvroFieldProcessor generateLongTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ /**
+ * Main function that convert input to Object with java data type
specified by schema
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == null) {
+ return Pair.of(false, null);
+ }
+ logicalType.validate(schema);
if (value instanceof Number) {
- return Pair.of(true, ((Number) value).longValue());
- } else if (value instanceof String) {
- return Pair.of(true, Long.valueOf((String) value));
+ return handleNumberValue((Number) value);
+ }
+ if (value instanceof String) {
+ String valStr = (String) value;
+ if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+ return handleStringNumber(valStr);
+ } else if (isWellFormedDateTime(valStr)) {
+ return handleStringValue(valStr);
+ }
}
return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateStringTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ // Handle the case when the input is a string that may be parsed as a
time.
+ protected abstract Pair<Boolean, Object> handleStringValue(String value);
+
+ protected DateTimeFormatter getDateTimeFormatter() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimeFormatter;
+ }
+
+ protected Pattern getDateTimePattern() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimePattern;
+ }
+
+ // Depending on the logical type the processor handles, they use
different parsing context
+ // when they need to parse a timestamp string in handleStringValue.
+ private static class DateTimeParseContext {
+ public DateTimeParseContext(DateTimeFormatter dateTimeFormatter,
Pattern dateTimePattern) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.dateTimePattern = dateTimePattern;
+ }
+
+ public final Pattern dateTimePattern;
+
+ public final DateTimeFormatter dateTimeFormatter;
+ }
+
+ private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+ private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
+ // Formatter for parsing local timestamp. It ignores the time zone
info of the timestamp.
+ // No pattern is defined as ISO_INSTANT format internal is not clear.
+ DateTimeParseContext dateTimestampParseContext = new
DateTimeParseContext(
+ DateTimeFormatter.ISO_INSTANT,
+ null /* match everything*/);
+ // Formatter for parsing timestamp with time zone. The pattern is
derived from ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
+ DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
+ DateTimeFormatter.ISO_LOCAL_TIME,
+
Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
+ // Formatter for parsing local time. The pattern is derived from
ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day>T<Hour>:<Minute> + optional
<second> + optional <fractional second>
+ DateTimeParseContext localTimestampParseContext = new
DateTimeParseContext(
+ DateTimeFormatter.ISO_LOCAL_DATE_TIME,
+
Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?")
+ );
+ // Formatter for parsing local date. The pattern is derived from
ISO_LOCAL_DATE definition.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day>
Review Comment:
```suggestion
// Formatter for parsing a date. The pattern is derived from
ISO_LOCAL_DATE definition.
// Pattern asserts the string is
// <optional sign><Year>-<Month>-<Day>
```
##########
hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java:
##########
@@ -187,196 +180,772 @@ private static Object convertJsonToAvroField(Object
value, String name, Schema s
throw new HoodieJsonToAvroConversionException(null, name, schema,
shouldSanitize, invalidCharMask);
}
- JsonToAvroFieldProcessor processor =
FIELD_TYPE_PROCESSORS.get(schema.getType());
- if (null != processor) {
- return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
- }
- throw new IllegalArgumentException("JsonConverter cannot handle type: " +
schema.getType());
+ return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema,
shouldSanitize, invalidCharMask);
}
- /**
- * Base Class for converting json to avro fields.
- */
- private abstract static class JsonToAvroFieldProcessor implements
Serializable {
+ private static class JsonToAvroFieldProcessorUtil {
+ /**
+ * Base Class for converting json to avro fields.
+ */
+ private abstract static class JsonToAvroFieldProcessor implements
Serializable {
- public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
- Pair<Boolean, Object> res = convert(value, name, schema, shouldSanitize,
invalidCharMask);
- if (!res.getLeft()) {
- throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
+ Pair<Boolean, Object> res = convert(value, name, schema,
shouldSanitize, invalidCharMask);
+ if (!res.getLeft()) {
+ throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ }
+ return res.getRight();
}
- return res.getRight();
+
+ protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
}
- protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
- }
+ public static Object convertToAvro(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
+ return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ private static JsonToAvroFieldProcessor getProcessorForSchema(Schema
schema) {
+ JsonToAvroFieldProcessor processor = null;
+
+ // 3 cases to consider: customized logicalType, logicalType, and type.
+ String customizedLogicalType = schema.getProp("logicalType");
+ LogicalType logicalType = schema.getLogicalType();
+ Type type = schema.getType();
+ if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
+ } else if (logicalType != null) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
+ } else {
+ processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
+ }
+
+ ValidationUtils.checkArgument(
+ processor != null, String.format("JsonConverter cannot handle type:
%s", type));
+ return processor;
+ }
+
+ // Avro primitive and complex type processors.
+ private static final Map<Schema.Type, JsonToAvroFieldProcessor>
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
+ // Avro logical type processors.
+ private static final Map<String, JsonToAvroFieldProcessor>
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+
+ /**
+ * Build type processor map for each avro type.
+ */
+ private static Map<Schema.Type, JsonToAvroFieldProcessor>
getFieldTypeProcessors() {
+ Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
+ fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+ fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+ fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+ fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+ fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+ fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+ fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+ fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ return Collections.unmodifiableMap(fieldTypeProcessors);
+ }
- private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static Map<String, JsonToAvroFieldProcessor>
getLogicalFieldTypeProcessors() {
+ return CollectionUtils.createImmutableMap(
+ Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new
DecimalLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new
TimeMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new
TimeMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new
DateLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new
LocalTimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new
LocalTimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new
TimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new
TimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new
DurationLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
+ }
+
+ private static class DecimalLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Boolean) {
- return Pair.of(true, value);
+
+ if (!isValidDecimalTypeConfig(schema)) {
+ return Pair.of(false, null);
+ }
+
+ // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
+ // parsing it to Fixed avro type.
+ if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
+ return processor.convert(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ // Case 2: Input is a number or String number.
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+ if (Boolean.FALSE.equals(parseResult.getLeft())) {
+ return Pair.of(false, null);
+ }
+ BigDecimal bigDecimal = parseResult.getRight();
+
+ // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
+ // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
+ // Allowed: 123.45, 123, 0.12
+ // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
+ // 123456, 0.12345
+ if (bigDecimal.scale() > decimalType.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ // Correspond to case
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 5 as scale 2 without rounding.
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 3 as scale 2 without rounding
+ return Pair.of(false, null);
+ }
+
+ switch (schema.getType()) {
+ case BYTES:
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+ return Pair.of(true, byteBuffer);
+ case FIXED:
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+ return Pair.of(true, fixedValue);
+ default: {
+ return Pair.of(false, null);
+ }
}
- return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateIntTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).intValue());
- } else if (value instanceof String) {
- return Pair.of(true, Integer.valueOf((String) value));
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDecimalTypeConfig(Schema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ // At the time when the schema is found not valid when it is parsed,
the Avro Schema.parse will just silently
+ // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
+ if (decimalType == null) {
+ return false;
}
- return Pair.of(false, null);
+ // Even though schema is validated at schema parsing phase, still
validate here to be defensive.
+ decimalType.validate(schema);
+ return true;
}
- };
- }
- private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).doubleValue());
- } else if (value instanceof String) {
- return Pair.of(true, Double.valueOf((String) value));
+ /**
+ * Parse the object to BigDecimal.
+ *
+ * @param obj Object to be parsed
+ * @return Pair object, with left as boolean indicating if the parsing
was successful and right as the
+ * BigDecimal value.
+ */
+ private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
+ // Case 1: Object is a number.
+ if (obj instanceof Number) {
+ return Pair.of(true, BigDecimal.valueOf(((Number)
obj).doubleValue()));
+ }
+
+ // Case 2: Object is a number in String format.
+ if (obj instanceof String) {
+ BigDecimal bigDecimal = null;
+ try {
+ bigDecimal = new BigDecimal(((String) obj));
+ } catch (java.lang.NumberFormatException ignored) {
+ /* ignore */
+ }
+ return Pair.of(bigDecimal != null, bigDecimal);
}
return Pair.of(false, null);
}
- };
- }
+ }
- private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static class DurationLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
+ private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+ /**
+ * We expect the input to be a list of 3 integers representing months,
days and milliseconds.
+ */
+ private boolean isValidDurationInput(Object value) {
+ if (!(value instanceof List<?>)) {
+ return false;
+ }
+ List<?> list = (List<?>) value;
+ if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+ return false;
+ }
+ for (Object element : list) {
+ if (!(element instanceof Integer)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Convert the given object to Avro object with schema whose logical
type is duration.
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).floatValue());
- } else if (value instanceof String) {
- return Pair.of(true, Float.valueOf((String) value));
+
+ if (!isValidDurationTypeConfig(schema)) {
+ return Pair.of(false, null);
}
- return Pair.of(false, null);
+ if (!isValidDurationInput(value)) {
+ return Pair.of(false, null);
+ }
+ // After the validation the input can be safely cast to List<Integer>
with 3 elements.
+ List<?> list = (List<?>) value;
+ List<Integer> converval = list.stream()
+ .filter(Integer.class::isInstance)
+ .map(Integer.class::cast)
+ .collect(Collectors.toList());
+
+ ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+ for (Integer element : converval) {
+ buffer.putInt(element); // months
+ }
+ return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ }
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDurationTypeConfig(Schema schema) {
+ String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+ LogicalType durationType = schema.getLogicalType();
+ String durationTypeProp = schema.getProp("logicalType");
+ // 1. The Avro type should be "Fixed".
+ // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+ // 3. Logical type name should be "duration". The name might be stored
in different places based on Avro version
+ // being used here.
+ return schema.getType().equals(Type.FIXED)
+ && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
+ && (durationType != null &&
durationType.getName().equals(durationTypeName)
+ || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ }
+ }
+
+ /**
+ * Processor utility handling Number inputs. Consumed by
TimeLogicalTypeProcessor.
+ */
+ private interface NumericParser {
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ Pair<Boolean, Object> handleNumberValue(Number value);
+
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ // @param value the input number in string format.
+ Pair<Boolean, Object> handleStringNumber(String value);
+
+ interface IntParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.intValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Integer.parseInt(value));
+ }
+ }
+
+ interface LongParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.longValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Long.parseLong(value));
+ }
+ }
+ }
+
+ /**
+ * Base Class for converting object to avro logical type
TimeMilli/TimeMicro.
+ */
+ private abstract static class TimeLogicalTypeProcessor extends
JsonToAvroFieldProcessor implements NumericParser {
+
+ protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+ // Logical type the processor is handling.
+ private final AvroLogicalTypeEnum logicalTypeEnum;
+
+ public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+ this.logicalTypeEnum = logicalTypeEnum;
}
- };
- }
- private static JsonToAvroFieldProcessor generateLongTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ /**
+ * Main function that convert input to Object with java data type
specified by schema
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == null) {
+ return Pair.of(false, null);
+ }
+ logicalType.validate(schema);
if (value instanceof Number) {
- return Pair.of(true, ((Number) value).longValue());
- } else if (value instanceof String) {
- return Pair.of(true, Long.valueOf((String) value));
+ return handleNumberValue((Number) value);
+ }
+ if (value instanceof String) {
+ String valStr = (String) value;
+ if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+ return handleStringNumber(valStr);
+ } else if (isWellFormedDateTime(valStr)) {
+ return handleStringValue(valStr);
+ }
}
return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateStringTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ // Handle the case when the input is a string that may be parsed as a
time.
+ protected abstract Pair<Boolean, Object> handleStringValue(String value);
+
+ protected DateTimeFormatter getDateTimeFormatter() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimeFormatter;
+ }
+
+ protected Pattern getDateTimePattern() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimePattern;
+ }
+
+ // Depending on the logical type the processor handles, they use
different parsing context
+ // when they need to parse a timestamp string in handleStringValue.
+ private static class DateTimeParseContext {
+ public DateTimeParseContext(DateTimeFormatter dateTimeFormatter,
Pattern dateTimePattern) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.dateTimePattern = dateTimePattern;
+ }
+
+ public final Pattern dateTimePattern;
+
+ public final DateTimeFormatter dateTimeFormatter;
+ }
+
+ private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+ private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
+ // Formatter for parsing local timestamp. It ignores the time zone
info of the timestamp.
+ // No pattern is defined as ISO_INSTANT format internal is not clear.
+ DateTimeParseContext dateTimestampParseContext = new
DateTimeParseContext(
+ DateTimeFormatter.ISO_INSTANT,
+ null /* match everything*/);
+ // Formatter for parsing timestamp with time zone. The pattern is
derived from ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
+ DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
+ DateTimeFormatter.ISO_LOCAL_TIME,
+
Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
+ // Formatter for parsing local time. The pattern is derived from
ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day>T<Hour>:<Minute> + optional
<second> + optional <fractional second>
+ DateTimeParseContext localTimestampParseContext = new
DateTimeParseContext(
+ DateTimeFormatter.ISO_LOCAL_DATE_TIME,
+
Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?")
+ );
+ // Formatter for parsing local date. The pattern is derived from
ISO_LOCAL_DATE definition.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day>
+ DateTimeParseContext localDateParseContext = new DateTimeParseContext(
+ DateTimeFormatter.ISO_LOCAL_DATE,
+ Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}?")
+ );
+
+ EnumMap<AvroLogicalTypeEnum, DateTimeParseContext> ctx = new
EnumMap<>(AvroLogicalTypeEnum.class);
+ ctx.put(AvroLogicalTypeEnum.TIME_MICROS, dateTimeParseContext);
+ ctx.put(AvroLogicalTypeEnum.TIME_MILLIS, dateTimeParseContext);
+ ctx.put(AvroLogicalTypeEnum.DATE, localDateParseContext);
+ ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS,
localTimestampParseContext);
+ ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS,
localTimestampParseContext);
+ ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MICROS,
dateTimestampParseContext);
+ ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MILLIS,
dateTimestampParseContext);
+ return Collections.unmodifiableMap(ctx);
+ }
+
+ // Pattern validating if it is an number in string form.
+ // Only check at most 19 digits as this is the max num of digits for
LONG.MAX_VALUE to contain the cost of regex matching.
+ protected static final Pattern ALL_DIGITS_WITH_OPTIONAL_SIGN =
Pattern.compile("^[-+]?\\d{1,19}$");
+
+ /**
+ * Check if the given string is a well-formed date time string.
+ * If no pattern is defined, it will always return true.
+ */
+ private boolean isWellFormedDateTime(String value) {
+ Pattern pattern = getDateTimePattern();
+ return pattern == null || pattern.matcher(value).matches();
+ }
+
+ protected Pair<Boolean, Instant> convertToInstantTime(String input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_INSTANT is implied
here
+ Instant time = null;
+ try {
+ time = Instant.parse(input);
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(time != null, time);
+ }
+
+ protected Pair<Boolean, LocalTime> convertToLocalTime(String input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is
implied here
+ LocalTime time = null;
+ try {
+ // Try parsing as an ISO date
+ time = LocalTime.parse(input);
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(time != null, time);
+ }
+
+ protected Pair<Boolean, LocalDateTime> convertToLocalDateTime(String
input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_DATE_TIME is
implied here
+ LocalDateTime time = null;
+ try {
+ // Try parsing as an ISO date
+ time = LocalDateTime.parse(input, getDateTimeFormatter());
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(time != null, time);
+ }
+ }
+
+ private static class DateLogicalTypeProcessor extends
TimeLogicalTypeProcessor
+ implements NumericParser.IntParser {
+ public DateLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.DATE);
+ }
+
@Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- return Pair.of(true, value.toString());
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ Pair<Boolean, LocalDate> result = convertToLocalDate(value);
+ if (!result.getLeft()) {
+ return Pair.of(false, null);
+ }
+ LocalDate date = result.getRight();
+ int daysSinceEpoch = (int)
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date);
+ return Pair.of(true, daysSinceEpoch);
+ }
+
+ private Pair<Boolean, LocalDate> convertToLocalDate(String input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is
implied here
Review Comment:
```suggestion
// Parse the input date string, DateTimeFormatter.ISO_LOCAL_DATE is
implied here
```
##########
hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java:
##########
@@ -187,196 +180,772 @@ private static Object convertJsonToAvroField(Object
value, String name, Schema s
throw new HoodieJsonToAvroConversionException(null, name, schema,
shouldSanitize, invalidCharMask);
}
- JsonToAvroFieldProcessor processor =
FIELD_TYPE_PROCESSORS.get(schema.getType());
- if (null != processor) {
- return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
- }
- throw new IllegalArgumentException("JsonConverter cannot handle type: " +
schema.getType());
+ return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema,
shouldSanitize, invalidCharMask);
}
- /**
- * Base Class for converting json to avro fields.
- */
- private abstract static class JsonToAvroFieldProcessor implements
Serializable {
+ private static class JsonToAvroFieldProcessorUtil {
+ /**
+ * Base Class for converting json to avro fields.
+ */
+ private abstract static class JsonToAvroFieldProcessor implements
Serializable {
- public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
- Pair<Boolean, Object> res = convert(value, name, schema, shouldSanitize,
invalidCharMask);
- if (!res.getLeft()) {
- throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
+ Pair<Boolean, Object> res = convert(value, name, schema,
shouldSanitize, invalidCharMask);
+ if (!res.getLeft()) {
+ throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
+ }
+ return res.getRight();
}
- return res.getRight();
+
+ protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
}
- protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
- }
+ public static Object convertToAvro(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
+ return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ private static JsonToAvroFieldProcessor getProcessorForSchema(Schema
schema) {
+ JsonToAvroFieldProcessor processor = null;
+
+ // 3 cases to consider: customized logicalType, logicalType, and type.
+ String customizedLogicalType = schema.getProp("logicalType");
+ LogicalType logicalType = schema.getLogicalType();
+ Type type = schema.getType();
+ if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
+ } else if (logicalType != null) {
+ processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
+ } else {
+ processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
+ }
+
+ ValidationUtils.checkArgument(
+ processor != null, String.format("JsonConverter cannot handle type:
%s", type));
+ return processor;
+ }
+
+ // Avro primitive and complex type processors.
+ private static final Map<Schema.Type, JsonToAvroFieldProcessor>
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
+ // Avro logical type processors.
+ private static final Map<String, JsonToAvroFieldProcessor>
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+
+ /**
+ * Build type processor map for each avro type.
+ */
+ private static Map<Schema.Type, JsonToAvroFieldProcessor>
getFieldTypeProcessors() {
+ Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
+ fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+ fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+ fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+ fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+ fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+ fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+ fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+ fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ return Collections.unmodifiableMap(fieldTypeProcessors);
+ }
- private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static Map<String, JsonToAvroFieldProcessor>
getLogicalFieldTypeProcessors() {
+ return CollectionUtils.createImmutableMap(
+ Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new
DecimalLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new
TimeMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new
TimeMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new
DateLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new
LocalTimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new
LocalTimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new
TimestampMicroLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new
TimestampMilliLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new
DurationLogicalTypeProcessor()),
+ Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
+ }
+
+ private static class DecimalLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Boolean) {
- return Pair.of(true, value);
+
+ if (!isValidDecimalTypeConfig(schema)) {
+ return Pair.of(false, null);
+ }
+
+ // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
+ // parsing it to Fixed avro type.
+ if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
+ return processor.convert(value, name, schema, shouldSanitize,
invalidCharMask);
+ }
+
+ // Case 2: Input is a number or String number.
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+ if (Boolean.FALSE.equals(parseResult.getLeft())) {
+ return Pair.of(false, null);
+ }
+ BigDecimal bigDecimal = parseResult.getRight();
+
+ // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
+ // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
+ // Allowed: 123.45, 123, 0.12
+ // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
+ // 123456, 0.12345
+ if (bigDecimal.scale() > decimalType.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ // Correspond to case
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 5 as scale 2 without rounding.
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 3 as scale 2 without rounding
+ return Pair.of(false, null);
+ }
+
+ switch (schema.getType()) {
+ case BYTES:
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+ return Pair.of(true, byteBuffer);
+ case FIXED:
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+ return Pair.of(true, fixedValue);
+ default: {
+ return Pair.of(false, null);
+ }
}
- return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateIntTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).intValue());
- } else if (value instanceof String) {
- return Pair.of(true, Integer.valueOf((String) value));
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDecimalTypeConfig(Schema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ // At the time when the schema is found not valid when it is parsed,
the Avro Schema.parse will just silently
+ // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
+ if (decimalType == null) {
+ return false;
}
- return Pair.of(false, null);
+ // Even though schema is validated at schema parsing phase, still
validate here to be defensive.
+ decimalType.validate(schema);
+ return true;
}
- };
- }
- private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).doubleValue());
- } else if (value instanceof String) {
- return Pair.of(true, Double.valueOf((String) value));
+ /**
+ * Parse the object to BigDecimal.
+ *
+ * @param obj Object to be parsed
+ * @return Pair object, with left as boolean indicating if the parsing
was successful and right as the
+ * BigDecimal value.
+ */
+ private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
+ // Case 1: Object is a number.
+ if (obj instanceof Number) {
+ return Pair.of(true, BigDecimal.valueOf(((Number)
obj).doubleValue()));
+ }
+
+ // Case 2: Object is a number in String format.
+ if (obj instanceof String) {
+ BigDecimal bigDecimal = null;
+ try {
+ bigDecimal = new BigDecimal(((String) obj));
+ } catch (java.lang.NumberFormatException ignored) {
+ /* ignore */
+ }
+ return Pair.of(bigDecimal != null, bigDecimal);
}
return Pair.of(false, null);
}
- };
- }
+ }
- private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ private static class DurationLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
+ private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+ /**
+ * We expect the input to be a list of 3 integers representing months,
days and milliseconds.
+ */
+ private boolean isValidDurationInput(Object value) {
+ if (!(value instanceof List<?>)) {
+ return false;
+ }
+ List<?> list = (List<?>) value;
+ if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+ return false;
+ }
+ for (Object element : list) {
+ if (!(element instanceof Integer)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Convert the given object to Avro object with schema whose logical
type is duration.
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).floatValue());
- } else if (value instanceof String) {
- return Pair.of(true, Float.valueOf((String) value));
+
+ if (!isValidDurationTypeConfig(schema)) {
+ return Pair.of(false, null);
}
- return Pair.of(false, null);
+ if (!isValidDurationInput(value)) {
+ return Pair.of(false, null);
+ }
+ // After the validation the input can be safely cast to List<Integer>
with 3 elements.
+ List<?> list = (List<?>) value;
+ List<Integer> converval = list.stream()
+ .filter(Integer.class::isInstance)
+ .map(Integer.class::cast)
+ .collect(Collectors.toList());
+
+ ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+ for (Integer element : converval) {
+ buffer.putInt(element); // months
+ }
+ return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ }
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ private static boolean isValidDurationTypeConfig(Schema schema) {
+ String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+ LogicalType durationType = schema.getLogicalType();
+ String durationTypeProp = schema.getProp("logicalType");
+ // 1. The Avro type should be "Fixed".
+ // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+ // 3. Logical type name should be "duration". The name might be stored
in different places based on Avro version
+ // being used here.
+ return schema.getType().equals(Type.FIXED)
+ && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
+ && (durationType != null &&
durationType.getName().equals(durationTypeName)
+ || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ }
+ }
+
+ /**
+ * Processor utility handling Number inputs. Consumed by
TimeLogicalTypeProcessor.
+ */
+ private interface NumericParser {
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ Pair<Boolean, Object> handleNumberValue(Number value);
+
+ // Convert the input number to Avro data type according to the class
+ // implementing this interface.
+ // @param value the input number in string format.
+ Pair<Boolean, Object> handleStringNumber(String value);
+
+ interface IntParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.intValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Integer.parseInt(value));
+ }
+ }
+
+ interface LongParser extends NumericParser {
+ @Override
+ default Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.longValue());
+ }
+
+ @Override
+ default Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Long.parseLong(value));
+ }
+ }
+ }
+
+ /**
+ * Base Class for converting object to avro logical type
TimeMilli/TimeMicro.
+ */
+ private abstract static class TimeLogicalTypeProcessor extends
JsonToAvroFieldProcessor implements NumericParser {
+
+ protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+ // Logical type the processor is handling.
+ private final AvroLogicalTypeEnum logicalTypeEnum;
+
+ public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+ this.logicalTypeEnum = logicalTypeEnum;
}
- };
- }
- private static JsonToAvroFieldProcessor generateLongTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ /**
+ * Main function that convert input to Object with java data type
specified by schema
+ */
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == null) {
+ return Pair.of(false, null);
+ }
+ logicalType.validate(schema);
if (value instanceof Number) {
- return Pair.of(true, ((Number) value).longValue());
- } else if (value instanceof String) {
- return Pair.of(true, Long.valueOf((String) value));
+ return handleNumberValue((Number) value);
+ }
+ if (value instanceof String) {
+ String valStr = (String) value;
+ if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+ return handleStringNumber(valStr);
+ } else if (isWellFormedDateTime(valStr)) {
+ return handleStringValue(valStr);
+ }
}
return Pair.of(false, null);
}
- };
- }
- private static JsonToAvroFieldProcessor generateStringTypeHandler() {
- return new JsonToAvroFieldProcessor() {
+ // Handle the case when the input is a string that may be parsed as a
time.
+ protected abstract Pair<Boolean, Object> handleStringValue(String value);
+
+ protected DateTimeFormatter getDateTimeFormatter() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimeFormatter;
+ }
+
+ protected Pattern getDateTimePattern() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
+ return ctx.dateTimePattern;
+ }
+
+ // Depending on the logical type the processor handles, they use
different parsing context
+ // when they need to parse a timestamp string in handleStringValue.
+ private static class DateTimeParseContext {
+ public DateTimeParseContext(DateTimeFormatter dateTimeFormatter,
Pattern dateTimePattern) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.dateTimePattern = dateTimePattern;
+ }
+
+ public final Pattern dateTimePattern;
+
+ public final DateTimeFormatter dateTimeFormatter;
+ }
+
+ private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+ private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
+ // Formatter for parsing local timestamp. It ignores the time zone
info of the timestamp.
+ // No pattern is defined as ISO_INSTANT format internal is not clear.
+ DateTimeParseContext dateTimestampParseContext = new
DateTimeParseContext(
+ DateTimeFormatter.ISO_INSTANT,
+ null /* match everything*/);
+ // Formatter for parsing timestamp with time zone. The pattern is
derived from ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
+ DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
+ DateTimeFormatter.ISO_LOCAL_TIME,
+
Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
+ // Formatter for parsing local time. The pattern is derived from
ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day>T<Hour>:<Minute> + optional
<second> + optional <fractional second>
Review Comment:
```suggestion
// Formatter for parsing a timestamp in a local timezone. The
pattern is derived from ISO_LOCAL_DATE_TIME definition.
// Pattern asserts the string is
// <optional sign><Year>-<Month>-<Day>T<Hour>:<Minute> + optional
<second> + optional <fractional second>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]