This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8335ce2e96a [HUDI-8105] Refactor MercifulJsonConverter to be
extendible for Json to Row conversion (#11800)
8335ce2e96a is described below
commit 8335ce2e96a783483fac07639b8aff325cab9c43
Author: vamsikarnika <[email protected]>
AuthorDate: Tue Aug 27 15:27:42 2024 +0530
[HUDI-8105] Refactor MercifulJsonConverter to be extendible for Json to Row
conversion (#11800)
* refactor merciful json converter
* remove unused imports
* fix lint issues
* fix exception caught for spark2.4
---------
Co-authored-by: Vamsi <[email protected]>
---
.../apache/hudi/avro/MercifulJsonConverter.java | 1042 ++++++--------------
.../avro/processors/DateLogicalTypeProcessor.java | 43 +
.../processors/DecimalLogicalTypeProcessor.java | 69 ++
.../processors/DurationLogicalTypeProcessor.java | 66 ++
.../hudi/avro/processors/EnumTypeProcessor.java | 33 +
.../hudi/avro/processors/FixedTypeProcessor.java | 38 +
.../hudi/avro/processors/JsonFieldProcessor.java | 39 +
.../LocalTimestampMicroLogicalTypeProcessor.java | 60 ++
.../LocalTimestampMilliLogicalTypeProcessor.java | 57 ++
.../org/apache/hudi/avro/processors/Parser.java | 83 ++
.../avro/processors/TimeLogicalTypeProcessor.java | 266 +++++
.../processors/TimeMicroLogicalTypeProcessor.java | 49 +
.../processors/TimeMilliLogicalTypeProcessor.java | 49 +
.../TimestampMicroLogicalTypeProcessor.java | 52 +
.../TimestampMilliLogicalTypeProcessor.java | 52 +
.../exception/HoodieJsonConversionException.java | 30 +
.../HoodieJsonToAvroConversionException.java | 33 +
.../hudi/avro/TestMercifulJsonConverter.java | 17 +-
18 files changed, 1347 insertions(+), 731 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index f738ab8e44a..5219a389662 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -18,11 +18,25 @@
package org.apache.hudi.avro;
+import org.apache.hudi.avro.processors.DateLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.DecimalLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.DurationLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.EnumTypeProcessor;
+import org.apache.hudi.avro.processors.FixedTypeProcessor;
+import org.apache.hudi.avro.processors.JsonFieldProcessor;
+import org.apache.hudi.avro.processors.LocalTimestampMicroLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.LocalTimestampMilliLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.Parser;
+import org.apache.hudi.avro.processors.TimeMicroLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.TimeMilliLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieJsonConversionException;
+import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Conversions;
@@ -35,17 +49,10 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
-import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.time.Instant;
import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeParseException;
-import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,7 +61,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -64,11 +70,13 @@ public class MercifulJsonConverter {
// For each schema (keyed by full name), stores a mapping of schema field
name to json field name to account for sanitization of fields
private static final Map<String, Map<String, String>>
SANITIZED_FIELD_MAPPINGS = new ConcurrentHashMap<>();
+ private final Map<Schema.Type, JsonFieldProcessor> fieldTypeProcessorMap;
+ private final Map<String, JsonFieldProcessor> fieldLogicalTypeProcessorMap;
- private final ObjectMapper mapper;
+ protected final ObjectMapper mapper;
- private final String invalidCharMask;
- private final boolean shouldSanitize;
+ protected final String invalidCharMask;
+ protected final boolean shouldSanitize;
/**
* Uses a default objectMapper to deserialize a json string.
@@ -91,6 +99,8 @@ public class MercifulJsonConverter {
this.mapper = mapper;
this.shouldSanitize = shouldSanitize;
this.invalidCharMask = invalidCharMask;
+ this.fieldTypeProcessorMap = getFieldTypeProcessors();
+ this.fieldLogicalTypeProcessorMap = getLogicalFieldTypeProcessors();
}
/**
@@ -104,9 +114,9 @@ public class MercifulJsonConverter {
public GenericRecord convert(String json, Schema schema) {
try {
Map<String, Object> jsonObjectMap = mapper.readValue(json, Map.class);
- return convertJsonToAvro(jsonObjectMap, schema, shouldSanitize,
invalidCharMask);
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
+ return convertJsonToAvro(jsonObjectMap, schema);
+ } catch (HoodieException | IOException e) {
+ throw new HoodieJsonToAvroConversionException("failed to convert json to
avro", e);
}
}
@@ -118,18 +128,18 @@ public class MercifulJsonConverter {
SANITIZED_FIELD_MAPPINGS.remove(schemaFullName);
}
- private static GenericRecord convertJsonToAvro(Map<String, Object>
inputJson, Schema schema, boolean shouldSanitize, String invalidCharMask) {
+ private GenericRecord convertJsonToAvro(Map<String, Object> inputJson,
Schema schema) {
GenericRecord avroRecord = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
Object val = shouldSanitize ? getFieldFromJson(f, inputJson,
schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
if (val != null) {
- avroRecord.put(f.pos(), convertJsonToAvroField(val, f.name(),
f.schema(), shouldSanitize, invalidCharMask));
+ avroRecord.put(f.pos(), convertJsonField(val, f.name(), f.schema()));
}
}
return avroRecord;
}
- private static Object getFieldFromJson(final Schema.Field fieldSchema, final
Map<String, Object> inputJson, final String schemaFullName, final String
invalidCharMask) {
+ protected static Object getFieldFromJson(final Schema.Field fieldSchema,
final Map<String, Object> inputJson, final String schemaFullName, final String
invalidCharMask) {
Map<String, String> schemaToJsonFieldNames =
SANITIZED_FIELD_MAPPINGS.computeIfAbsent(schemaFullName, unused -> new
ConcurrentHashMap<>());
if (!schemaToJsonFieldNames.containsKey(fieldSchema.name())) {
// if we don't have field mapping, proactively populate as many as
possible based on input json
@@ -154,19 +164,19 @@ public class MercifulJsonConverter {
return null;
}
- private static Schema getNonNull(Schema schema) {
+ private Schema getNonNull(Schema schema) {
List<Schema> types = schema.getTypes();
Schema.Type firstType = types.get(0).getType();
return firstType.equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
}
- private static boolean isOptional(Schema schema) {
+ private boolean isOptional(Schema schema) {
return schema.getType().equals(Schema.Type.UNION) &&
schema.getTypes().size() == 2
&& (schema.getTypes().get(0).getType().equals(Schema.Type.NULL)
|| schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
}
- private static Object convertJsonToAvroField(Object value, String name,
Schema schema, boolean shouldSanitize, String invalidCharMask) {
+ protected Object convertJsonField(Object value, String name, Schema schema) {
if (isOptional(schema)) {
if (value == null) {
@@ -176,790 +186,376 @@ public class MercifulJsonConverter {
}
} else if (value == null) {
// Always fail on null for non-nullable schemas
- throw new HoodieJsonToAvroConversionException(null, name, schema,
shouldSanitize, invalidCharMask);
+ throw buildConversionException(String.format("Symbol %s not in enum",
value.toString()),
+ schema.getFullName(), schema, shouldSanitize, invalidCharMask);
}
- return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema,
shouldSanitize, invalidCharMask);
+ return convertField(value, name, schema);
}
- private static class JsonToAvroFieldProcessorUtil {
- /**
- * Base Class for converting json to avro fields.
- */
- private abstract static class JsonToAvroFieldProcessor implements
Serializable {
-
- public Object convertToAvro(Object value, String name, Schema schema,
boolean shouldSanitize, String invalidCharMask) {
- Pair<Boolean, Object> res = convert(value, name, schema,
shouldSanitize, invalidCharMask);
- if (!res.getLeft()) {
- throw new HoodieJsonToAvroConversionException(value, name, schema,
shouldSanitize, invalidCharMask);
- }
- return res.getRight();
- }
+ private Object convertField(Object value, String name, Schema schema) {
+ JsonFieldProcessor processor = getProcessorForSchema(schema);
+ return processor.convertField(value, name, schema);
+ }
- protected abstract Pair<Boolean, Object> convert(Object value, String
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
- }
+ protected JsonFieldProcessor getProcessorForSchema(Schema schema) {
+ JsonFieldProcessor processor = null;
- public static Object convertToAvro(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
- return processor.convertToAvro(value, name, schema, shouldSanitize,
invalidCharMask);
+ // 3 cases to consider: customized logicalType, logicalType, and type.
+ String customizedLogicalType = schema.getProp("logicalType");
+ LogicalType logicalType = schema.getLogicalType();
+ Type type = schema.getType();
+ if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+ processor = fieldLogicalTypeProcessorMap.get(customizedLogicalType);
+ } else if (logicalType != null) {
+ processor = fieldLogicalTypeProcessorMap.get(logicalType.getName());
+ } else {
+ processor = fieldTypeProcessorMap.get(type);
}
- private static JsonToAvroFieldProcessor getProcessorForSchema(Schema
schema) {
- JsonToAvroFieldProcessor processor = null;
-
- // 3 cases to consider: customized logicalType, logicalType, and type.
- String customizedLogicalType = schema.getProp("logicalType");
- LogicalType logicalType = schema.getLogicalType();
- Type type = schema.getType();
- if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
- processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
- } else if (logicalType != null) {
- processor =
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
- } else {
- processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
- }
+ ValidationUtils.checkArgument(
+ processor != null, String.format("JsonConverter cannot handle type:
%s", type));
+ return processor;
+ }
- ValidationUtils.checkArgument(
- processor != null, String.format("JsonConverter cannot handle type:
%s", type));
- return processor;
- }
+ /**
+ * Build type processor map for each avro type.
+ */
+ private Map<Schema.Type, JsonFieldProcessor> getFieldTypeProcessors() {
+ Map<Schema.Type, JsonFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
+ fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+ fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+ fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+ fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+ fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+ fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+ fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+ fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+ fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+ fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+ fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+ fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+ return Collections.unmodifiableMap(fieldTypeProcessors);
+ }
- // Avro primitive and complex type processors.
- private static final Map<Schema.Type, JsonToAvroFieldProcessor>
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
- // Avro logical type processors.
- private static final Map<String, JsonToAvroFieldProcessor>
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+ private Map<String, JsonFieldProcessor> getLogicalFieldTypeProcessors() {
+ return CollectionUtils.createImmutableMap(
+ Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(),
generateDecimalLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(),
generateTimeMicroLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(),
generateTimeMilliLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.DATE.getValue(),
generateDateLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(),
generateLocalTimeStampMicroLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(),
generateLocalTimeStampMilliLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(),
generateTimestampMicroLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(),
generateTimestampMilliLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.DURATION.getValue(),
generateDurationLogicalTypeHandler()),
+ Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
+ }
- /**
- * Build type processor map for each avro type.
- */
- private static Map<Schema.Type, JsonToAvroFieldProcessor>
getFieldTypeProcessors() {
- Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new
EnumMap<>(Schema.Type.class);
- fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
- fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
- fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
- fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
- fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
- fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
- fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
- fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
- fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
- fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
- fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
- fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
- return Collections.unmodifiableMap(fieldTypeProcessors);
- }
+ protected JsonFieldProcessor generateDecimalLogicalTypeHandler() {
+ return new DecimalToAvroLogicalTypeProcessor();
+ }
- private static Map<String, JsonToAvroFieldProcessor>
getLogicalFieldTypeProcessors() {
- return CollectionUtils.createImmutableMap(
- Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new
DecimalLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new
TimeMicroLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new
TimeMilliLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new
DateLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new
LocalTimestampMicroLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new
LocalTimestampMilliLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new
TimestampMicroLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new
TimestampMilliLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new
DurationLogicalTypeProcessor()),
- Pair.of(AvroLogicalTypeEnum.UUID.getValue(),
generateStringTypeHandler()));
- }
+ protected JsonFieldProcessor generateTimeMicroLogicalTypeHandler() {
+ return new TimeMicroLogicalTypeProcessor();
+ }
- private static class DecimalLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
+ protected JsonFieldProcessor generateTimeMilliLogicalTypeHandler() {
+ return new TimeMilliLogicalTypeProcessor();
+ }
- if (!isValidDecimalTypeConfig(schema)) {
- return Pair.of(false, null);
- }
+ protected JsonFieldProcessor generateDateLogicalTypeHandler() {
+ return new DateToAvroLogicalTypeProcessor();
+ }
- // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
- // parsing it to Fixed avro type.
- if (value instanceof List<?> && schema.getType() == Type.FIXED) {
- JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
- return processor.convert(value, name, schema, shouldSanitize,
invalidCharMask);
- }
+ protected JsonFieldProcessor generateLocalTimeStampMicroLogicalTypeHandler()
{
+ return new LocalTimestampMicroLogicalTypeProcessor();
+ }
- // Case 2: Input is a number or String number.
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
- Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
- if (Boolean.FALSE.equals(parseResult.getLeft())) {
- return Pair.of(false, null);
- }
- BigDecimal bigDecimal = parseResult.getRight();
-
- // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
- // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
- // Allowed: 123.45, 123, 0.12
- // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
- // 123456, 0.12345
- if (bigDecimal.scale() > decimalType.getScale()
- || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
- // Correspond to case
- // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 5 as scale 2 without rounding.
- // org.apache.avro.AvroTypeException: Cannot encode decimal with
scale 3 as scale 2 without rounding
- return Pair.of(false, null);
- }
+ protected JsonFieldProcessor generateLocalTimeStampMilliLogicalTypeHandler()
{
+ return new LocalTimestampMilliLogicalTypeProcessor();
+ }
- switch (schema.getType()) {
- case BYTES:
- // Convert to primitive Arvo type that logical type Decimal uses.
- ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
- return Pair.of(true, byteBuffer);
- case FIXED:
- GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
- return Pair.of(true, fixedValue);
- default: {
- return Pair.of(false, null);
- }
- }
- }
+ protected JsonFieldProcessor generateTimestampMicroLogicalTypeHandler() {
+ return new TimestampMicroLogicalTypeProcessor();
+ }
- /**
- * Check if the given schema is a valid decimal type configuration.
- */
- private static boolean isValidDecimalTypeConfig(Schema schema) {
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
- // At the time when the schema is found not valid when it is parsed,
the Avro Schema.parse will just silently
- // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
- if (decimalType == null) {
- return false;
- }
- // Even though schema is validated at schema parsing phase, still
validate here to be defensive.
- decimalType.validate(schema);
- return true;
- }
+ protected JsonFieldProcessor generateTimestampMilliLogicalTypeHandler() {
+ return new TimestampMilliLogicalTypeProcessor();
+ }
- /**
- * Parse the object to BigDecimal.
- *
- * @param obj Object to be parsed
- * @return Pair object, with left as boolean indicating if the parsing
was successful and right as the
- * BigDecimal value.
- */
- private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
- // Case 1: Object is a number.
- if (obj instanceof Number) {
- return Pair.of(true, BigDecimal.valueOf(((Number)
obj).doubleValue()));
- }
+ protected JsonFieldProcessor generateDurationLogicalTypeHandler() {
+ return new DurationToAvroLogicalTypeProcessor();
+ }
- // Case 2: Object is a number in String format.
- if (obj instanceof String) {
- BigDecimal bigDecimal = null;
- try {
- bigDecimal = new BigDecimal(((String) obj));
- } catch (java.lang.NumberFormatException ignored) {
- /* ignore */
- }
- return Pair.of(bigDecimal != null, bigDecimal);
- }
+ private class DecimalToAvroLogicalTypeProcessor extends
DecimalLogicalTypeProcessor {
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ if (!isValidDecimalTypeConfig(schema)) {
return Pair.of(false, null);
}
- }
-
- private static class DurationLogicalTypeProcessor extends
JsonToAvroFieldProcessor {
- private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
-
- /**
- * We expect the input to be a list of 3 integers representing months,
days and milliseconds.
- */
- private boolean isValidDurationInput(Object value) {
- if (!(value instanceof List<?>)) {
- return false;
- }
- List<?> list = (List<?>) value;
- if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
- return false;
- }
- for (Object element : list) {
- if (!(element instanceof Integer)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Convert the given object to Avro object with schema whose logical
type is duration.
- */
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (!isValidDurationTypeConfig(schema)) {
- return Pair.of(false, null);
- }
- if (!isValidDurationInput(value)) {
- return Pair.of(false, null);
- }
- // After the validation the input can be safely cast to List<Integer>
with 3 elements.
- List<?> list = (List<?>) value;
- List<Integer> converval = list.stream()
- .filter(Integer.class::isInstance)
- .map(Integer.class::cast)
- .collect(Collectors.toList());
-
- ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
- for (Integer element : converval) {
- buffer.putInt(element); // months
- }
- return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+ // Case 1: Input is a list. It is expected to be raw Fixed byte array
input, and we only support
+ // parsing it to Fixed avro type.
+ if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+ JsonFieldProcessor processor = generateFixedTypeHandler();
+ return processor.convert(value, name, schema);
}
- /**
- * Check if the given schema is a valid decimal type configuration.
- */
- private static boolean isValidDurationTypeConfig(Schema schema) {
- String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
- LogicalType durationType = schema.getLogicalType();
- String durationTypeProp = schema.getProp("logicalType");
- // 1. The Avro type should be "Fixed".
- // 2. Fixed size must be of 12 bytes as it hold 3 integers.
- // 3. Logical type name should be "duration". The name might be stored
in different places based on Avro version
- // being used here.
- return schema.getType().equals(Type.FIXED)
- && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
- && (durationType != null &&
durationType.getName().equals(durationTypeName)
- || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ // Case 2: Input is a number or String number.
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+ if (Boolean.FALSE.equals(parseResult.getLeft())) {
+ return Pair.of(false, null);
}
- }
-
- /**
- * Processor utility handling Number inputs. Consumed by
TimeLogicalTypeProcessor.
- */
- private interface NumericParser {
- // Convert the input number to Avro data type according to the class
- // implementing this interface.
- Pair<Boolean, Object> handleNumberValue(Number value);
-
- // Convert the input number to Avro data type according to the class
- // implementing this interface.
- // @param value the input number in string format.
- Pair<Boolean, Object> handleStringNumber(String value);
-
- interface IntParser extends NumericParser {
- @Override
- default Pair<Boolean, Object> handleNumberValue(Number value) {
- return Pair.of(true, value.intValue());
- }
-
- @Override
- default Pair<Boolean, Object> handleStringNumber(String value) {
- return Pair.of(true, Integer.parseInt(value));
- }
+ BigDecimal bigDecimal = parseResult.getRight();
+
+ // As we don't do rounding, the validation will enforce the scale part
and the integer part are all within the
+ // limit. As a result, if scale is 2 precision is 5, we only allow 3
digits for the integer.
+ // Allowed: 123.45, 123, 0.12
+ // Disallowed: 1234 (4 digit integer while the scale has already
reserved 2 digit out of the 5 digit precision)
+ // 123456, 0.12345
+ if (bigDecimal.scale() > decimalType.getScale()
+ || (bigDecimal.precision() - bigDecimal.scale()) >
(decimalType.getPrecision() - decimalType.getScale())) {
+ // Correspond to case
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with scale
5 as scale 2 without rounding.
+ // org.apache.avro.AvroTypeException: Cannot encode decimal with scale
3 as scale 2 without rounding
+ return Pair.of(false, null);
}
- interface LongParser extends NumericParser {
- @Override
- default Pair<Boolean, Object> handleNumberValue(Number value) {
- return Pair.of(true, value.longValue());
- }
-
- @Override
- default Pair<Boolean, Object> handleStringNumber(String value) {
- return Pair.of(true, Long.parseLong(value));
+ switch (schema.getType()) {
+ case BYTES:
+ // Convert to primitive Arvo type that logical type Decimal uses.
+ ByteBuffer byteBuffer = new
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+ return Pair.of(true, byteBuffer);
+ case FIXED:
+ GenericFixed fixedValue = new
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+ return Pair.of(true, fixedValue);
+ default: {
+ return Pair.of(false, null);
}
}
}
+ }
+
+ private static class DurationToAvroLogicalTypeProcessor extends
DurationLogicalTypeProcessor {
/**
- * Base Class for converting object to avro logical type
TimeMilli/TimeMicro.
+ * Convert the given object to Avro object with schema whose logical type
is duration.
*/
- private abstract static class TimeLogicalTypeProcessor extends
JsonToAvroFieldProcessor implements NumericParser {
-
- protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
-
- // Logical type the processor is handling.
- private final AvroLogicalTypeEnum logicalTypeEnum;
-
- public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
- this.logicalTypeEnum = logicalTypeEnum;
- }
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
- /**
- * Main function that convert input to Object with java data type
specified by schema
- */
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- LogicalType logicalType = schema.getLogicalType();
- if (logicalType == null) {
- return Pair.of(false, null);
- }
- logicalType.validate(schema);
- if (value instanceof Number) {
- return handleNumberValue((Number) value);
- }
- if (value instanceof String) {
- String valStr = (String) value;
- if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
- return handleStringNumber(valStr);
- } else if (isWellFormedDateTime(valStr)) {
- return handleStringValue(valStr);
- }
- }
+ if (!isValidDurationTypeConfig(schema)) {
return Pair.of(false, null);
}
-
- // Handle the case when the input is a string that may be parsed as a
time.
- protected abstract Pair<Boolean, Object> handleStringValue(String value);
-
- protected DateTimeFormatter getDateTimeFormatter() {
- DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
- assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
- return ctx.dateTimeFormatter;
- }
-
- protected Pattern getDateTimePattern() {
- DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
- assert ctx != null : String.format("%s should have configured date
time context.", logicalTypeEnum.getValue());
- return ctx.dateTimePattern;
- }
-
- // Depending on the logical type the processor handles, they use
different parsing context
- // when they need to parse a timestamp string in handleStringValue.
- private static class DateTimeParseContext {
- public DateTimeParseContext(DateTimeFormatter dateTimeFormatter,
Pattern dateTimePattern) {
- this.dateTimeFormatter = dateTimeFormatter;
- this.dateTimePattern = dateTimePattern;
- }
-
- public final Pattern dateTimePattern;
-
- public final DateTimeFormatter dateTimeFormatter;
- }
-
- private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
-
- private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
- // Formatter for parsing a timestamp. It assumes UTC timezone, with
'Z' as the zone ID.
- // No pattern is defined as ISO_INSTANT format internal is not clear.
- DateTimeParseContext dateTimestampParseContext = new
DateTimeParseContext(
- DateTimeFormatter.ISO_INSTANT,
- null /* match everything*/);
- // Formatter for parsing a time of day. The pattern is derived from
ISO_LOCAL_TIME definition.
- // Pattern asserts the string is
- // <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
- DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
- DateTimeFormatter.ISO_LOCAL_TIME,
-
Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
- // Formatter for parsing a timestamp in a local timezone. The pattern
is derived from ISO_LOCAL_DATE_TIME definition.
- // Pattern asserts the string is
- // <optional sign><Year>-<Month>-<Day>T<Hour>:<Minute> + optional
<second> + optional <fractional second>
- DateTimeParseContext localTimestampParseContext = new
DateTimeParseContext(
- DateTimeFormatter.ISO_LOCAL_DATE_TIME,
-
Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?")
- );
- // Formatter for parsing a date. The pattern is derived from
ISO_LOCAL_DATE definition.
- // Pattern asserts the string is
- // <optional sign><Year>-<Month>-<Day>
- DateTimeParseContext localDateParseContext = new DateTimeParseContext(
- DateTimeFormatter.ISO_LOCAL_DATE,
- Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}?")
- );
-
- EnumMap<AvroLogicalTypeEnum, DateTimeParseContext> ctx = new
EnumMap<>(AvroLogicalTypeEnum.class);
- ctx.put(AvroLogicalTypeEnum.TIME_MICROS, dateTimeParseContext);
- ctx.put(AvroLogicalTypeEnum.TIME_MILLIS, dateTimeParseContext);
- ctx.put(AvroLogicalTypeEnum.DATE, localDateParseContext);
- ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS,
localTimestampParseContext);
- ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS,
localTimestampParseContext);
- ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MICROS,
dateTimestampParseContext);
- ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MILLIS,
dateTimestampParseContext);
- return Collections.unmodifiableMap(ctx);
- }
-
- // Pattern validating if it is an number in string form.
- // Only check at most 19 digits as this is the max num of digits for
LONG.MAX_VALUE to contain the cost of regex matching.
- protected static final Pattern ALL_DIGITS_WITH_OPTIONAL_SIGN =
Pattern.compile("^[-+]?\\d{1,19}$");
-
- /**
- * Check if the given string is a well-formed date time string.
- * If no pattern is defined, it will always return true.
- */
- private boolean isWellFormedDateTime(String value) {
- Pattern pattern = getDateTimePattern();
- return pattern == null || pattern.matcher(value).matches();
- }
-
- protected Pair<Boolean, Instant> convertToInstantTime(String input) {
- // Parse the input timestamp, DateTimeFormatter.ISO_INSTANT is implied
here
- Instant time = null;
- try {
- time = Instant.parse(input);
- } catch (DateTimeParseException ignore) {
- /* ignore */
- }
- return Pair.of(time != null, time);
- }
-
- protected Pair<Boolean, LocalTime> convertToLocalTime(String input) {
- // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is
implied here
- LocalTime time = null;
- try {
- // Try parsing as an ISO date
- time = LocalTime.parse(input);
- } catch (DateTimeParseException ignore) {
- /* ignore */
- }
- return Pair.of(time != null, time);
+ if (!isValidDurationInput(value)) {
+ return Pair.of(false, null);
}
+ // After the validation the input can be safely cast to List<Integer>
with 3 elements.
+ List<?> list = (List<?>) value;
+ List<Integer> converval = list.stream()
+ .filter(Integer.class::isInstance)
+ .map(Integer.class::cast)
+ .collect(Collectors.toList());
- protected Pair<Boolean, LocalDateTime> convertToLocalDateTime(String
input) {
- // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_DATE_TIME is
implied here
- LocalDateTime time = null;
- try {
- // Try parsing as an ISO date
- time = LocalDateTime.parse(input, getDateTimeFormatter());
- } catch (DateTimeParseException ignore) {
- /* ignore */
- }
- return Pair.of(time != null, time);
+ ByteBuffer buffer =
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+ for (Integer element : converval) {
+ buffer.putInt(element); // months
}
+ return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
}
+ }
- private static class DateLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.IntParser {
- public DateLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.DATE);
- }
-
- @Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, LocalDate> result = convertToLocalDate(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
- }
- LocalDate date = result.getRight();
- int daysSinceEpoch = (int)
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date);
- return Pair.of(true, daysSinceEpoch);
- }
+ private static class DateToAvroLogicalTypeProcessor extends
DateLogicalTypeProcessor {
- private Pair<Boolean, LocalDate> convertToLocalDate(String input) {
- // Parse the input date string, DateTimeFormatter.ISO_LOCAL_DATE is
implied here
- LocalDate date = null;
- try {
- // Try parsing as an ISO date
- date = LocalDate.parse(input);
- } catch (DateTimeParseException ignore) {
- /* ignore */
- }
- return Pair.of(date != null, date);
- }
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.IntParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ if (!isWellFormedDateTime(value)) {
+ return Pair.of(false, null);
+ }
+ Pair<Boolean, LocalDate> result = convertToLocalDate(value);
+ if (!result.getLeft()) {
+ return Pair.of(false, null);
+ }
+ LocalDate date = result.getRight();
+ int daysSinceEpoch = (int)
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date);
+ return Pair.of(true, daysSinceEpoch);
+ }
+ },
+ value, schema);
}
+ }
- /**
- * Processor for TimeMilli logical type.
- */
- private static class TimeMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.IntParser {
- public TimeMilliLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.TIME_MILLIS);
- }
-
+ protected JsonFieldProcessor generateBooleanTypeHandler() {
+ return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, LocalTime> result = convertToLocalTime(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ if (value instanceof Boolean) {
+ return Pair.of(true, value);
}
- LocalTime time = result.getRight();
- Integer millisOfDay = time.toSecondOfDay() * 1000 + time.getNano() /
1000000;
- return Pair.of(true, millisOfDay);
- }
- }
-
- /**
- * Processor for TimeMicro logical type.
- */
- private static class TimeMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.LongParser {
- public TimeMicroLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.TIME_MICROS);
+ return Pair.of(false, null);
}
+ };
+ }
+ protected JsonFieldProcessor generateIntTypeHandler() {
+ return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, LocalTime> result = convertToLocalTime(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ if (value instanceof Number) {
+ return Pair.of(true, ((Number) value).intValue());
+ } else if (value instanceof String) {
+ return Pair.of(true, Integer.valueOf((String) value));
}
- LocalTime time = result.getRight();
- Long microsOfDay = (long) time.toSecondOfDay() * 1000000 +
time.getNano() / 1000;
- return Pair.of(true, microsOfDay);
- }
- }
-
- /**
- * Processor for TimeMicro logical type.
- */
- private static class LocalTimestampMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.LongParser {
- public LocalTimestampMicroLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS);
+ return Pair.of(false, null);
}
+ };
+ }
+ protected JsonFieldProcessor generateDoubleTypeHandler() {
+ return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, LocalDateTime> result = convertToLocalDateTime(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ if (value instanceof Number) {
+ return Pair.of(true, ((Number) value).doubleValue());
+ } else if (value instanceof String) {
+ return Pair.of(true, Double.valueOf((String) value));
}
- LocalDateTime time = result.getRight();
-
- // Calculate the difference in milliseconds
- long diffInMicros = LOCAL_UNIX_EPOCH.until(time,
ChronoField.MICRO_OF_SECOND.getBaseUnit());
- return Pair.of(true, diffInMicros);
- }
- }
-
- /**
- * Processor for TimeMicro logical type.
- */
- private static class TimestampMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.LongParser {
- public TimestampMicroLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.TIMESTAMP_MICROS);
+ return Pair.of(false, null);
}
+ };
+ }
+ protected JsonFieldProcessor generateFloatTypeHandler() {
+ return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, Instant> result = convertToInstantTime(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ if (value instanceof Number) {
+ return Pair.of(true, ((Number) value).floatValue());
+ } else if (value instanceof String) {
+ return Pair.of(true, Float.valueOf((String) value));
}
- Instant time = result.getRight();
-
- // Calculate the difference in milliseconds
- long diffInMicro = Instant.EPOCH.until(time,
ChronoField.MICRO_OF_SECOND.getBaseUnit());
- return Pair.of(true, diffInMicro);
- }
- }
-
- /**
- * Processor for TimeMicro logical type.
- */
- private static class LocalTimestampMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.LongParser {
- public LocalTimestampMilliLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS);
+ return Pair.of(false, null);
}
+ };
+ }
+ protected JsonFieldProcessor generateLongTypeHandler() {
+ return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, LocalDateTime> result = convertToLocalDateTime(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ if (value instanceof Number) {
+ return Pair.of(true, ((Number) value).longValue());
+ } else if (value instanceof String) {
+ return Pair.of(true, Long.valueOf((String) value));
}
- LocalDateTime time = result.getRight();
-
- // Calculate the difference in milliseconds
- long diffInMillis = LOCAL_UNIX_EPOCH.until(time,
ChronoField.MILLI_OF_SECOND.getBaseUnit());
- return Pair.of(true, diffInMillis);
+ return Pair.of(false, null);
}
- }
+ };
+ }
- /**
- * Processor for TimeMicro logical type.
- */
- private static class TimestampMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor
- implements NumericParser.LongParser {
- public TimestampMilliLogicalTypeProcessor() {
- super(AvroLogicalTypeEnum.TIMESTAMP_MILLIS);
+ protected JsonFieldProcessor generateStringTypeHandler() {
+ return new JsonFieldProcessor() {
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ return Pair.of(true, value.toString());
}
+ };
+ }
+ protected JsonFieldProcessor generateBytesTypeHandler() {
+ return new JsonFieldProcessor() {
@Override
- public Pair<Boolean, Object> handleStringValue(String value) {
- Pair<Boolean, Instant> result = convertToInstantTime(value);
- if (!result.getLeft()) {
- return Pair.of(false, null);
- }
- Instant time = result.getRight();
-
- // Calculate the difference in milliseconds
- long diffInMillis = Instant.EPOCH.until(time,
ChronoField.MILLI_OF_SECOND.getBaseUnit());
- return Pair.of(true, diffInMillis);
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ // Should return ByteBuffer (see GenericData.isBytes())
+ return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
}
- }
-
- private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Boolean) {
- return Pair.of(true, value);
- }
- return Pair.of(false, null);
- }
- };
- }
-
- private static JsonToAvroFieldProcessor generateIntTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).intValue());
- } else if (value instanceof String) {
- return Pair.of(true, Integer.valueOf((String) value));
- }
- return Pair.of(false, null);
- }
- };
- }
-
- private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).doubleValue());
- } else if (value instanceof String) {
- return Pair.of(true, Double.valueOf((String) value));
- }
- return Pair.of(false, null);
- }
- };
- }
-
- private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).floatValue());
- } else if (value instanceof String) {
- return Pair.of(true, Float.valueOf((String) value));
- }
- return Pair.of(false, null);
- }
- };
- }
+ };
+ }
- private static JsonToAvroFieldProcessor generateLongTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (value instanceof Number) {
- return Pair.of(true, ((Number) value).longValue());
- } else if (value instanceof String) {
- return Pair.of(true, Long.valueOf((String) value));
- }
- return Pair.of(false, null);
- }
- };
- }
+ protected JsonFieldProcessor generateFixedTypeHandler() {
+ return new AvroFixedTypeProcessor();
+ }
- private static JsonToAvroFieldProcessor generateStringTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- return Pair.of(true, value.toString());
- }
- };
+ private static class AvroFixedTypeProcessor extends FixedTypeProcessor {
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ return Pair.of(true, new GenericData.Fixed(
+ schema, convertToJavaObject(value, name, schema)));
}
+ }
- private static JsonToAvroFieldProcessor generateBytesTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- // Should return ByteBuffer (see GenericData.isBytes())
- return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
- }
- };
+ private static class AvroEnumTypeProcessor extends EnumTypeProcessor {
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ return Pair.of(true, new GenericData.EnumSymbol(schema,
convertToJavaObject(value, name, schema)));
}
+ }
- private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- // The ObjectMapper use List to represent FixedType
- // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to
ArrayList<Integer>
- List<Integer> converval = (List<Integer>) value;
- byte[] src = new byte[converval.size()];
- for (int i = 0; i < converval.size(); i++) {
- src[i] = converval.get(i).byteValue();
- }
- byte[] dst = new byte[schema.getFixedSize()];
- System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(),
src.length));
- return Pair.of(true, new GenericData.Fixed(schema, dst));
- }
- };
- }
+ protected JsonFieldProcessor generateEnumTypeHandler() {
+ return new AvroEnumTypeProcessor();
+ }
- private static JsonToAvroFieldProcessor generateEnumTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- if (schema.getEnumSymbols().contains(value.toString())) {
- return Pair.of(true, new GenericData.EnumSymbol(schema,
value.toString()));
- }
- throw new HoodieJsonToAvroConversionException(String.format("Symbol
%s not in enum", value.toString()),
- schema.getFullName(), schema, shouldSanitize, invalidCharMask);
- }
- };
- }
+ protected JsonFieldProcessor generateRecordTypeHandler() {
+ return new JsonFieldProcessor() {
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ return Pair.of(true, convertJsonToAvro((Map<String, Object>) value,
schema));
+ }
+ };
+ }
- private static JsonToAvroFieldProcessor generateRecordTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- return Pair.of(true, convertJsonToAvro((Map<String, Object>) value,
schema, shouldSanitize, invalidCharMask));
+ protected JsonFieldProcessor generateArrayTypeHandler() {
+ return new JsonFieldProcessor() {
+ private List<Object> convertToJavaObject(Object value, String name,
Schema schema) {
+ Schema elementSchema = schema.getElementType();
+ List<Object> listRes = new ArrayList<>();
+ for (Object v : (List) value) {
+ listRes.add(convertJsonField(v, name, elementSchema));
}
- };
- }
+ return listRes;
+ }
- private static JsonToAvroFieldProcessor generateArrayTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- Schema elementSchema = schema.getElementType();
- List<Object> listRes = new ArrayList<>();
- for (Object v : (List) value) {
- listRes.add(convertJsonToAvroField(v, name, elementSchema,
shouldSanitize, invalidCharMask));
- }
- return Pair.of(true, new GenericData.Array<>(schema, listRes));
- }
- };
- }
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ return Pair.of(true, new GenericData.Array<>(
+ schema,
+ convertToJavaObject(
+ value,
+ name,
+ schema)));
+ }
+ };
+ }
- private static JsonToAvroFieldProcessor generateMapTypeHandler() {
- return new JsonToAvroFieldProcessor() {
- @Override
- public Pair<Boolean, Object> convert(Object value, String name, Schema
schema, boolean shouldSanitize, String invalidCharMask) {
- Schema valueSchema = schema.getValueType();
- Map<String, Object> mapRes = new HashMap<>();
- for (Map.Entry<String, Object> v : ((Map<String, Object>)
value).entrySet()) {
- mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name,
valueSchema, shouldSanitize, invalidCharMask));
- }
- return Pair.of(true, mapRes);
+ protected JsonFieldProcessor generateMapTypeHandler() {
+ return new JsonFieldProcessor() {
+ @Override
+ public Pair<Boolean, Object> convert(Object value, String name, Schema
schema) {
+ Schema valueSchema = schema.getValueType();
+ Map<String, Object> mapRes = new HashMap<>();
+ for (Map.Entry<String, Object> v : ((Map<String, Object>)
value).entrySet()) {
+ mapRes.put(v.getKey(), convertJsonField(v.getValue(), name,
valueSchema));
}
- };
- }
+ return Pair.of(true, mapRes);
+ }
+ };
}
- /**
- * Exception Class for any schema conversion issue.
- */
- public static class HoodieJsonToAvroConversionException extends
HoodieException {
-
- private final Object value;
-
- private final String fieldName;
- private final Schema schema;
- private final boolean shouldSanitize;
- private final String invalidCharMask;
-
- public HoodieJsonToAvroConversionException(Object value, String fieldName,
Schema schema, boolean shouldSanitize, String invalidCharMask) {
- this.value = value;
- this.fieldName = fieldName;
- this.schema = schema;
- this.shouldSanitize = shouldSanitize;
- this.invalidCharMask = invalidCharMask;
- }
-
- @Override
- public String toString() {
- if (shouldSanitize) {
- return String.format("Json to Avro Type conversion error for field %s,
%s for %s. Field sanitization is enabled with a mask of %s.", fieldName, value,
schema, invalidCharMask);
- }
- return String.format("Json to Avro Type conversion error for field %s,
%s for %s", fieldName, value, schema);
+ protected HoodieJsonConversionException buildConversionException(Object
value, String fieldName, Schema schema, boolean shouldSanitize, String
invalidCharMask) {
+ String errorMsg;
+ if (shouldSanitize) {
+ errorMsg = String.format("Json to Avro Type conversion error for field
%s, %s for %s. Field sanitization is enabled with a mask of %s.", fieldName,
value, schema, invalidCharMask);
+ } else {
+ errorMsg = String.format("Json to Avro Type conversion error for field
%s, %s for %s", fieldName, value, schema);
}
+ return new HoodieJsonConversionException(errorMsg);
}
-}
+
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DateLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DateLogicalTypeProcessor.java
new file mode 100644
index 00000000000..9d00e53d261
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DateLogicalTypeProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeParseException;
+
+public abstract class DateLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
+ public DateLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.DATE);
+ }
+
+ protected Pair<Boolean, LocalDate> convertToLocalDate(String input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is implied
here
+ LocalDate date = null;
+ try {
+ // Try parsing as an ISO date
+ date = LocalDate.parse(input);
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(date != null, date);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
new file mode 100644
index 00000000000..1c49e086e46
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.math.BigDecimal;
+
+public abstract class DecimalLogicalTypeProcessor extends JsonFieldProcessor {
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ protected static boolean isValidDecimalTypeConfig(Schema schema) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ // At the time when the schema is found not valid when it is parsed, the
Avro Schema.parse will just silently
+ // set the schema to be null instead of throwing exceptions.
Correspondingly, we just check if it is null here.
+ if (decimalType == null) {
+ return false;
+ }
+ // Even though schema is validated at schema parsing phase, still validate
here to be defensive.
+ decimalType.validate(schema);
+ return true;
+ }
+
+ /**
+ * Parse the object to BigDecimal.
+ *
+ * @param obj Object to be parsed
+ * @return Pair object, with left as boolean indicating if the parsing was
successful and right as the
+ * BigDecimal value.
+ */
+ protected static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object
obj) {
+ if (obj instanceof Number) {
+ return Pair.of(true, BigDecimal.valueOf(((Number) obj).doubleValue()));
+ }
+
+ // Case 2: Object is a number in String format.
+ if (obj instanceof String) {
+ BigDecimal bigDecimal = null;
+ try {
+ bigDecimal = new BigDecimal(((String) obj));
+ } catch (java.lang.NumberFormatException ignored) {
+ /* ignore */
+ }
+ return Pair.of(bigDecimal != null, bigDecimal);
+ }
+ return Pair.of(false, null);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
new file mode 100644
index 00000000000..47e37db5c50
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+public abstract class DurationLogicalTypeProcessor extends JsonFieldProcessor {
+ private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+ /**
+ * We expect the input to be a list of 3 integers representing months, days
and milliseconds.
+ */
+ protected boolean isValidDurationInput(Object value) {
+ if (!(value instanceof List<?>)) {
+ return false;
+ }
+ List<?> list = (List<?>) value;
+ if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+ return false;
+ }
+ for (Object element : list) {
+ if (!(element instanceof Integer)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check if the given schema is a valid decimal type configuration.
+ */
+ protected static boolean isValidDurationTypeConfig(Schema schema) {
+ String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+ LogicalType durationType = schema.getLogicalType();
+ String durationTypeProp = schema.getProp("logicalType");
+ // 1. The Avro type should be "Fixed".
+ // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+ // 3. Logical type name should be "duration". The name might be stored in
different places based on Avro version
+ // being used here.
+ return schema.getType().equals(Schema.Type.FIXED)
+ && schema.getFixedSize() == Integer.BYTES *
NUM_ELEMENTS_FOR_DURATION_TYPE
+ && (durationType != null &&
durationType.getName().equals(durationTypeName)
+ || durationTypeProp != null &&
durationTypeProp.equals(durationTypeName));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
new file mode 100644
index 00000000000..d21ace8a8a5
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
+
+import org.apache.avro.Schema;
+
+public abstract class EnumTypeProcessor extends JsonFieldProcessor {
+
+ protected Object convertToJavaObject(Object value, String name, Schema
schema) {
+ if (schema.getEnumSymbols().contains(value.toString())) {
+ return value.toString();
+ }
+ throw new HoodieJsonToAvroConversionException(String.format("Symbol %s not
in enum", value));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
new file mode 100644
index 00000000000..0604ddad19c
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+public abstract class FixedTypeProcessor extends JsonFieldProcessor {
+ protected byte[] convertToJavaObject(Object value, String name, Schema
schema) {
+ // The ObjectMapper use List to represent FixedType
+ // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to
ArrayList<Integer>
+ List<Integer> converval = (List<Integer>) value;
+ byte[] src = new byte[converval.size()];
+ for (int i = 0; i < converval.size(); i++) {
+ src[i] = converval.get(i).byteValue();
+ }
+ byte[] dst = new byte[schema.getFixedSize()];
+ System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(),
src.length));
+ return dst;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
new file mode 100644
index 00000000000..dd76c463eca
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieJsonConversionException;
+
+import org.apache.avro.Schema;
+
+import java.io.Serializable;
+
+public abstract class JsonFieldProcessor implements Serializable {
+
+ public Object convertField(Object value, String name, Schema schema) {
+ Pair<Boolean, Object> res = convert(value, name, schema);
+ if (!res.getLeft()) {
+ throw new HoodieJsonConversionException("failed to convert json to
avro");
+ }
+ return res.getRight();
+ }
+
+ public abstract Pair<Boolean, Object> convert(Object value, String name,
Schema schema);
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
new file mode 100644
index 00000000000..20ea08de9d8
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoField;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class LocalTimestampMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
+ public LocalTimestampMicroLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS);
+ }
+
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.LongParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ if (!isWellFormedDateTime(value)) {
+ return Pair.of(false, null);
+ }
+ Pair<Boolean, LocalDateTime> result =
convertToLocalDateTime(value);
+ if (!result.getLeft()) {
+ return Pair.of(false, null);
+ }
+ LocalDateTime time = result.getRight();
+
+ // Calculate the difference in milliseconds
+ long diffInMicros = LOCAL_UNIX_EPOCH.until(time,
ChronoField.MICRO_OF_SECOND.getBaseUnit());
+ return Pair.of(true, diffInMicros);
+ }
+ },
+ value, schema);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
new file mode 100644
index 00000000000..62753daf484
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoField;
+
+public class LocalTimestampMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
+ public LocalTimestampMilliLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS);
+ }
+
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.LongParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ if (!isWellFormedDateTime(value)) {
+ return Pair.of(false, null);
+ }
+ Pair<Boolean, LocalDateTime> result =
convertToLocalDateTime(value);
+ if (!result.getLeft()) {
+ return Pair.of(false, null);
+ }
+ LocalDateTime time = result.getRight();
+
+ // Calculate the difference in milliseconds
+ long diffInMillis = LOCAL_UNIX_EPOCH.until(time,
ChronoField.MILLI_OF_SECOND.getBaseUnit());
+ return Pair.of(true, diffInMillis);
+ }
+ },
+ value, schema);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/Parser.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/Parser.java
new file mode 100644
index 00000000000..4a6e4ee9fb9
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/processors/Parser.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+public abstract class Parser {
+ abstract Pair<Boolean, Object> handleNumberValue(Number value);
+
+ abstract Pair<Boolean, Object> handleStringNumber(String value);
+
+ abstract Pair<Boolean, Object> handleStringValue(String value);
+
+ public static class IntParser extends Parser {
+ @Override
+ public Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.intValue());
+ }
+
+ @Override
+ public Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Integer.parseInt(value));
+ }
+
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return Pair.of(true, Integer.valueOf(value));
+ }
+ }
+
+ public static class DateParser extends Parser {
+
+ private static long MILLI_SECONDS_PER_DAY = 86400000;
+
+ @Override
+ public Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, new java.sql.Date(value.intValue() *
MILLI_SECONDS_PER_DAY));
+ }
+
+ @Override
+ public Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, new java.sql.Date(Integer.parseInt(value) *
MILLI_SECONDS_PER_DAY));
+ }
+
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return Pair.of(true, java.sql.Date.valueOf(value));
+ }
+ }
+
+ public static class LongParser extends Parser {
+ @Override
+ public Pair<Boolean, Object> handleNumberValue(Number value) {
+ return Pair.of(true, value.longValue());
+ }
+
+ @Override
+ public Pair<Boolean, Object> handleStringNumber(String value) {
+ return Pair.of(true, Long.parseLong(value));
+ }
+
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return Pair.of(true, Long.valueOf(value));
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
new file mode 100644
index 00000000000..9cb3b20fb0b
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.time.chrono.IsoChronology;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.format.ResolverStyle;
+import java.time.temporal.ChronoField;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
+
+/**
+ * Base Class for converting object to avro logical type TimeMilli/TimeMicro.
+ */
+public abstract class TimeLogicalTypeProcessor extends JsonFieldProcessor {
+
+ protected static final LocalDateTime LOCAL_UNIX_EPOCH =
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+ // Logical type the processor is handling.
+ private final AvroLogicalTypeEnum logicalTypeEnum;
+
+ public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+ this.logicalTypeEnum = logicalTypeEnum;
+ }
+
+ /**
+ * Main function that convert input to Object with java data type specified
by schema
+ */
+ public Pair<Boolean, Object> convertCommon(Parser parser, Object value,
Schema schema) {
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType == null) {
+ return Pair.of(false, null);
+ }
+ logicalType.validate(schema);
+ if (value instanceof Number) {
+ return parser.handleNumberValue((Number) value);
+ }
+ if (value instanceof String) {
+ String valStr = (String) value;
+ if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+ return parser.handleStringNumber(valStr);
+ } else {
+ return parser.handleStringValue(valStr);
+ }
+ }
+ return Pair.of(false, null);
+ }
+
+ protected DateTimeFormatter getDateTimeFormatter() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ return ctx == null ? null : ctx.dateTimeFormatter;
+ }
+
+ protected Pattern getDateTimePattern() {
+ DateTimeParseContext ctx =
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+ return ctx == null ? null : ctx.dateTimePattern;
+ }
+
+ // Depending on the logical type the processor handles, they use different
parsing context
+ // when they need to parse a timestamp string in handleStringValue.
+ private static class DateTimeParseContext {
+ public DateTimeParseContext(DateTimeFormatter dateTimeFormatter, Pattern
dateTimePattern) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.dateTimePattern = dateTimePattern;
+ }
+
+ public final Pattern dateTimePattern;
+
+ public final DateTimeFormatter dateTimeFormatter;
+ }
+
+ private static final Map<AvroLogicalTypeEnum, DateTimeParseContext>
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+ private static Map<AvroLogicalTypeEnum, DateTimeParseContext>
getParseContext() {
+ // The pattern is derived from ISO_LOCAL_DATE_TIME definition with the
relaxation on the separator.
+ DateTimeFormatter localDateTimeFormatter = new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .append(ISO_LOCAL_DATE)
+ .optionalStart()
+ .appendLiteral('T')
+ .optionalEnd()
+ .optionalStart()
+ .appendLiteral(' ')
+ .optionalEnd()
+ .append(ISO_LOCAL_TIME)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.STRICT)
+ .withChronology(IsoChronology.INSTANCE);
+ // Formatter for parsing timestamp.
+ // The pattern is derived from ISO_OFFSET_DATE_TIME definition with the
relaxation on the separator.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day><separator><Hour>:<Minute> +
optional <second> + optional <fractional second> + optional <zone offset>
+ // <separator> is 'T' or ' '
+ // For example, "2024-07-13T11:36:01.951Z",
"2024-07-13T11:36:01.951+01:00",
+ // "2024-07-13T11:36:01Z", "2024-07-13T11:36:01+01:00",
+ // "2024-07-13 11:36:01.951Z", "2024-07-13 11:36:01.951+01:00".
+ // See TestMercifulJsonConverter#timestampLogicalTypeGoodCaseTest
+ // and #timestampLogicalTypeBadTest for supported and unsupported cases.
+ DateTimeParseContext dateTimestampParseContext = new DateTimeParseContext(
+ new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .append(localDateTimeFormatter)
+ .optionalStart()
+ .appendOffsetId()
+ .optionalEnd()
+ .parseDefaulting(ChronoField.OFFSET_SECONDS, 0L)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.STRICT)
+ .withChronology(IsoChronology.INSTANCE),
+ null /* match everything*/);
+ // Formatter for parsing time of day. The pattern is derived from
ISO_LOCAL_TIME definition.
+ // Pattern asserts the string is
+ // <optional sign><Hour>:<Minute> + optional <second> + optional
<fractional second>
+ // For example, "11:36:01.951".
+ // See TestMercifulJsonConverter#timeLogicalTypeTest
+ // and #timeLogicalTypeBadCaseTest for supported and unsupported cases.
+ DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
+ ISO_LOCAL_TIME,
+ Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
+ // Formatter for parsing local timestamp.
+ // The pattern is derived from ISO_LOCAL_DATE_TIME definition with the
relaxation on the separator.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day><separator><Hour>:<Minute> +
optional <second> + optional <fractional second>
+ // <separator> is 'T' or ' '
+ // For example, "2024-07-13T11:36:01.951", "2024-07-13 11:36:01.951".
+ // See TestMercifulJsonConverter#localTimestampLogicalTypeGoodCaseTest
+ // and #localTimestampLogicalTypeBadTest for supported and unsupported
cases.
+ DateTimeParseContext localTimestampParseContext = new DateTimeParseContext(
+ localDateTimeFormatter,
+ Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}[T
]\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?")
+ );
+ // Formatter for parsing local date. The pattern is derived from
ISO_LOCAL_DATE definition.
+ // Pattern asserts the string is
+ // <optional sign><Year>-<Month>-<Day>
+ // For example, "2024-07-13".
+ // See TestMercifulJsonConverter#dateLogicalTypeTest for supported and
unsupported cases.
+ DateTimeParseContext localDateParseContext = new DateTimeParseContext(
+ ISO_LOCAL_DATE,
+ Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}?")
+ );
+
+ EnumMap<AvroLogicalTypeEnum, DateTimeParseContext> ctx = new
EnumMap<>(AvroLogicalTypeEnum.class);
+ ctx.put(AvroLogicalTypeEnum.TIME_MICROS, dateTimeParseContext);
+ ctx.put(AvroLogicalTypeEnum.TIME_MILLIS, dateTimeParseContext);
+ ctx.put(AvroLogicalTypeEnum.DATE, localDateParseContext);
+ ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS,
localTimestampParseContext);
+ ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS,
localTimestampParseContext);
+ ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MICROS, dateTimestampParseContext);
+ ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MILLIS, dateTimestampParseContext);
+ return Collections.unmodifiableMap(ctx);
+ }
+
+ // Pattern validating if it is an number in string form.
+ // Only check at most 19 digits as this is the max num of digits for
LONG.MAX_VALUE to contain the cost of regex matching.
+ protected static final Pattern ALL_DIGITS_WITH_OPTIONAL_SIGN =
Pattern.compile("^[-+]?\\d{1,19}$");
+
+ /**
+ * Check if the given string is a well-formed date time string.
+ * If no pattern is defined, it will always return true.
+ */
+ protected boolean isWellFormedDateTime(String value) {
+ Pattern pattern = getDateTimePattern();
+ return pattern == null || pattern.matcher(value).matches();
+ }
+
+ protected Pair<Boolean, Instant> convertToInstantTime(String input) {
+ // Parse the input timestamp
+ // The input string is assumed in the format:
+ // <optional sign><Year>-<Month>-<Day><separator><Hour>:<Minute> +
optional <second> + optional <fractional second> + optional <zone offset>
+ // <separator> is 'T' or ' '
+ Instant time = null;
+ try {
+ ZonedDateTime dateTime = ZonedDateTime.parse(input,
getDateTimeFormatter());
+ time = dateTime.toInstant();
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(time != null, time);
+ }
+
+ protected Pair<Boolean, LocalTime> convertToLocalTime(String input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is implied
here
+ LocalTime time = null;
+ try {
+ // Try parsing as an ISO date
+ time = LocalTime.parse(input);
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(time != null, time);
+ }
+
+ protected Pair<Boolean, LocalDateTime> convertToLocalDateTime(String input) {
+ // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_DATE_TIME is
implied here
+ LocalDateTime time = null;
+ try {
+ // Try parsing as an ISO date
+ time = LocalDateTime.parse(input, getDateTimeFormatter());
+ } catch (DateTimeParseException ignore) {
+ /* ignore */
+ }
+ return Pair.of(time != null, time);
+ }
+
+ protected Pair<Boolean, Object> convertDateTime(
+ String value,
+ Function<LocalTime, Object> localTimeFunction,
+ Function<Instant, Object> instantTimeFunction) {
+
+ if (!isWellFormedDateTime(value)) {
+ return Pair.of(false, null);
+ }
+
+ if (localTimeFunction != null) {
+ Pair<Boolean, LocalTime> result = convertToLocalTime(value);
+ if (!result.getLeft()) {
+ return Pair.of(false, null);
+ }
+ return Pair.of(true, localTimeFunction.apply(result.getRight()));
+ }
+
+ if (instantTimeFunction != null) {
+ Pair<Boolean, Instant> result = convertToInstantTime(value);
+ if (!result.getLeft()) {
+ return Pair.of(false, null);
+ }
+ return Pair.of(true, instantTimeFunction.apply(result.getRight()));
+ }
+
+ return Pair.of(false, null); // Fallback in case of error
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
new file mode 100644
index 00000000000..75a011ed28e
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class TimeMicroLogicalTypeProcessor extends TimeLogicalTypeProcessor {
+ public TimeMicroLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.TIME_MICROS);
+ }
+
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.LongParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return convertDateTime(
+ value,
+ time -> (long) time.toSecondOfDay() * 1000000 + time.getNano()
/ 1000, // Micros of day
+ null);
+ }
+ },
+ value, schema);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
new file mode 100644
index 00000000000..f6f2ed2c2bf
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+/**
+ * Processor for TimeMilli logical type.
+ */
+public class TimeMilliLogicalTypeProcessor extends TimeLogicalTypeProcessor {
+ public TimeMilliLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.TIME_MILLIS);
+ }
+
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.IntParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return convertDateTime(
+ value,
+ time -> time.toSecondOfDay() * 1000 + time.getNano() /
1000000, // Millis of day
+ null);
+ }
+ },
+ value, schema);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
new file mode 100644
index 00000000000..b1cf7353382
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.Instant;
+import java.time.temporal.ChronoField;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class TimestampMicroLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
+ public TimestampMicroLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.TIMESTAMP_MICROS);
+ }
+
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.LongParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return convertDateTime(
+ value,
+ null,
+ time -> Instant.EPOCH.until(time,
ChronoField.MICRO_OF_SECOND.getBaseUnit())); // Diff in micro
+ }
+ },
+ value, schema);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
new file mode 100644
index 00000000000..5c5493f38b3
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.Instant;
+import java.time.temporal.ChronoField;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class TimestampMilliLogicalTypeProcessor extends
TimeLogicalTypeProcessor {
+ public TimestampMilliLogicalTypeProcessor() {
+ super(AvroLogicalTypeEnum.TIMESTAMP_MILLIS);
+ }
+
+ @Override
+ public Pair<Boolean, Object> convert(
+ Object value, String name, Schema schema) {
+ return convertCommon(
+ new Parser.LongParser() {
+ @Override
+ public Pair<Boolean, Object> handleStringValue(String value) {
+ return convertDateTime(
+ value,
+ null,
+ time -> Instant.EPOCH.until(time,
ChronoField.MILLI_OF_SECOND.getBaseUnit())); // Diff in millis
+ }
+ },
+ value, schema);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonConversionException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonConversionException.java
new file mode 100644
index 00000000000..08c476480a0
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonConversionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieJsonConversionException extends HoodieException {
+
+ public HoodieJsonConversionException(String msg) {
+ super(msg);
+ }
+
+ public HoodieJsonConversionException(String msg, Throwable t) {
+ super(msg, t);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonToAvroConversionException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonToAvroConversionException.java
new file mode 100644
index 00000000000..b90cd53597e
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonToAvroConversionException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception Class for any schema conversion issue.
+ */
+public class HoodieJsonToAvroConversionException extends
HoodieJsonConversionException {
+
+ public HoodieJsonToAvroConversionException(String msg) {
+ super(msg);
+ }
+
+ public HoodieJsonToAvroConversionException(String msg, Throwable t) {
+ super(msg, t);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
index 49e707a1c9e..0c9be20570b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
+++
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Conversions;
@@ -97,7 +98,7 @@ public class TestMercifulJsonConverter {
String json = MAPPER.writeValueAsString(data);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(json, schema);
});
}
@@ -266,7 +267,7 @@ public class TestMercifulJsonConverter {
Schema schema = SchemaTestUtil.getSchemaFromResourceFilePath(schemaFile);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(json, schema);
});
}
@@ -336,7 +337,7 @@ public class TestMercifulJsonConverter {
Map<String, Object> data = new HashMap<>();
data.put("dateField", input);
String json = MAPPER.writeValueAsString(data);
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(json, schema);
});
}
@@ -442,7 +443,7 @@ public class TestMercifulJsonConverter {
data.put("timestamp", input);
String json = MAPPER.writeValueAsString(data);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(json, schema);
});
}
@@ -565,7 +566,7 @@ public class TestMercifulJsonConverter {
data.put("timestampMillisField", validInput);
data.put("timestampMicrosField", badInput);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
@@ -573,7 +574,7 @@ public class TestMercifulJsonConverter {
data.put("timestampMillisField", badInput);
data.put("timestampMicrosField", validInput);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
}
@@ -657,7 +658,7 @@ public class TestMercifulJsonConverter {
data.put("timeMicroField", validInput);
data.put("timeMillisField", invalidInput);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
@@ -665,7 +666,7 @@ public class TestMercifulJsonConverter {
data.put("timeMicroField", invalidInput);
data.put("timeMillisField", validInput);
// Schedule with timestamp same as that of committed instant
-
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class,
() -> {
+ assertThrows(HoodieJsonToAvroConversionException.class, () -> {
CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
});
}