This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8335ce2e96a [HUDI-8105] Refactor MercifulJsonConverter to be 
extendible for Json to Row conversion (#11800)
8335ce2e96a is described below

commit 8335ce2e96a783483fac07639b8aff325cab9c43
Author: vamsikarnika <[email protected]>
AuthorDate: Tue Aug 27 15:27:42 2024 +0530

    [HUDI-8105] Refactor MercifulJsonConverter to be extendible for Json to Row 
conversion (#11800)
    
    * refactor merciful json converter
    
    * remove unused imports
    
    * fix lint issues
    
    * fix exception caught for spark2.4
    
    ---------
    
    Co-authored-by: Vamsi <[email protected]>
---
 .../apache/hudi/avro/MercifulJsonConverter.java    | 1042 ++++++--------------
 .../avro/processors/DateLogicalTypeProcessor.java  |   43 +
 .../processors/DecimalLogicalTypeProcessor.java    |   69 ++
 .../processors/DurationLogicalTypeProcessor.java   |   66 ++
 .../hudi/avro/processors/EnumTypeProcessor.java    |   33 +
 .../hudi/avro/processors/FixedTypeProcessor.java   |   38 +
 .../hudi/avro/processors/JsonFieldProcessor.java   |   39 +
 .../LocalTimestampMicroLogicalTypeProcessor.java   |   60 ++
 .../LocalTimestampMilliLogicalTypeProcessor.java   |   57 ++
 .../org/apache/hudi/avro/processors/Parser.java    |   83 ++
 .../avro/processors/TimeLogicalTypeProcessor.java  |  266 +++++
 .../processors/TimeMicroLogicalTypeProcessor.java  |   49 +
 .../processors/TimeMilliLogicalTypeProcessor.java  |   49 +
 .../TimestampMicroLogicalTypeProcessor.java        |   52 +
 .../TimestampMilliLogicalTypeProcessor.java        |   52 +
 .../exception/HoodieJsonConversionException.java   |   30 +
 .../HoodieJsonToAvroConversionException.java       |   33 +
 .../hudi/avro/TestMercifulJsonConverter.java       |   17 +-
 18 files changed, 1347 insertions(+), 731 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index f738ab8e44a..5219a389662 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -18,11 +18,25 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.avro.processors.DateLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.DecimalLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.DurationLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.EnumTypeProcessor;
+import org.apache.hudi.avro.processors.FixedTypeProcessor;
+import org.apache.hudi.avro.processors.JsonFieldProcessor;
+import org.apache.hudi.avro.processors.LocalTimestampMicroLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.LocalTimestampMilliLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.Parser;
+import org.apache.hudi.avro.processors.TimeMicroLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.TimeMilliLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.TimestampMicroLogicalTypeProcessor;
+import org.apache.hudi.avro.processors.TimestampMilliLogicalTypeProcessor;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieJsonConversionException;
+import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Conversions;
@@ -35,17 +49,10 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.time.Instant;
 import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeParseException;
-import java.time.temporal.ChronoField;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,7 +61,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
@@ -64,11 +70,13 @@ public class MercifulJsonConverter {
 
   // For each schema (keyed by full name), stores a mapping of schema field 
name to json field name to account for sanitization of fields
   private static final Map<String, Map<String, String>> 
SANITIZED_FIELD_MAPPINGS = new ConcurrentHashMap<>();
+  private final Map<Schema.Type, JsonFieldProcessor> fieldTypeProcessorMap;
+  private final Map<String, JsonFieldProcessor> fieldLogicalTypeProcessorMap;
 
-  private final ObjectMapper mapper;
+  protected final ObjectMapper mapper;
 
-  private final String invalidCharMask;
-  private final boolean shouldSanitize;
+  protected final String invalidCharMask;
+  protected final boolean shouldSanitize;
 
   /**
    * Uses a default objectMapper to deserialize a json string.
@@ -91,6 +99,8 @@ public class MercifulJsonConverter {
     this.mapper = mapper;
     this.shouldSanitize = shouldSanitize;
     this.invalidCharMask = invalidCharMask;
+    this.fieldTypeProcessorMap = getFieldTypeProcessors();
+    this.fieldLogicalTypeProcessorMap = getLogicalFieldTypeProcessors();
   }
 
   /**
@@ -104,9 +114,9 @@ public class MercifulJsonConverter {
   public GenericRecord convert(String json, Schema schema) {
     try {
       Map<String, Object> jsonObjectMap = mapper.readValue(json, Map.class);
-      return convertJsonToAvro(jsonObjectMap, schema, shouldSanitize, 
invalidCharMask);
-    } catch (IOException e) {
-      throw new HoodieIOException(e.getMessage(), e);
+      return convertJsonToAvro(jsonObjectMap, schema);
+    } catch (HoodieException | IOException e) {
+      throw new HoodieJsonToAvroConversionException("failed to convert json to 
avro", e);
     }
   }
 
@@ -118,18 +128,18 @@ public class MercifulJsonConverter {
     SANITIZED_FIELD_MAPPINGS.remove(schemaFullName);
   }
 
-  private static GenericRecord convertJsonToAvro(Map<String, Object> 
inputJson, Schema schema, boolean shouldSanitize, String invalidCharMask) {
+  private GenericRecord convertJsonToAvro(Map<String, Object> inputJson, 
Schema schema) {
     GenericRecord avroRecord = new GenericData.Record(schema);
     for (Schema.Field f : schema.getFields()) {
       Object val = shouldSanitize ? getFieldFromJson(f, inputJson, 
schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
       if (val != null) {
-        avroRecord.put(f.pos(), convertJsonToAvroField(val, f.name(), 
f.schema(), shouldSanitize, invalidCharMask));
+        avroRecord.put(f.pos(), convertJsonField(val, f.name(), f.schema()));
       }
     }
     return avroRecord;
   }
 
-  private static Object getFieldFromJson(final Schema.Field fieldSchema, final 
Map<String, Object> inputJson, final String schemaFullName, final String 
invalidCharMask) {
+  protected static Object getFieldFromJson(final Schema.Field fieldSchema, 
final Map<String, Object> inputJson, final String schemaFullName, final String 
invalidCharMask) {
     Map<String, String> schemaToJsonFieldNames = 
SANITIZED_FIELD_MAPPINGS.computeIfAbsent(schemaFullName, unused -> new 
ConcurrentHashMap<>());
     if (!schemaToJsonFieldNames.containsKey(fieldSchema.name())) {
       // if we don't have field mapping, proactively populate as many as 
possible based on input json
@@ -154,19 +164,19 @@ public class MercifulJsonConverter {
     return null;
   }
 
-  private static Schema getNonNull(Schema schema) {
+  private Schema getNonNull(Schema schema) {
     List<Schema> types = schema.getTypes();
     Schema.Type firstType = types.get(0).getType();
     return firstType.equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
   }
 
-  private static boolean isOptional(Schema schema) {
+  private boolean isOptional(Schema schema) {
     return schema.getType().equals(Schema.Type.UNION) && 
schema.getTypes().size() == 2
         && (schema.getTypes().get(0).getType().equals(Schema.Type.NULL)
         || schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
   }
 
-  private static Object convertJsonToAvroField(Object value, String name, 
Schema schema, boolean shouldSanitize, String invalidCharMask) {
+  protected Object convertJsonField(Object value, String name, Schema schema) {
 
     if (isOptional(schema)) {
       if (value == null) {
@@ -176,790 +186,376 @@ public class MercifulJsonConverter {
       }
     } else if (value == null) {
       // Always fail on null for non-nullable schemas
-      throw new HoodieJsonToAvroConversionException(null, name, schema, 
shouldSanitize, invalidCharMask);
+      throw buildConversionException(String.format("Symbol %s not in enum", 
value.toString()),
+          schema.getFullName(), schema, shouldSanitize, invalidCharMask);
     }
 
-    return JsonToAvroFieldProcessorUtil.convertToAvro(value, name, schema, 
shouldSanitize, invalidCharMask);
+    return convertField(value, name, schema);
   }
 
-  private static class JsonToAvroFieldProcessorUtil {
-    /**
-     * Base Class for converting json to avro fields.
-     */
-    private abstract static class JsonToAvroFieldProcessor implements 
Serializable {
-
-      public Object convertToAvro(Object value, String name, Schema schema, 
boolean shouldSanitize, String invalidCharMask) {
-        Pair<Boolean, Object> res = convert(value, name, schema, 
shouldSanitize, invalidCharMask);
-        if (!res.getLeft()) {
-          throw new HoodieJsonToAvroConversionException(value, name, schema, 
shouldSanitize, invalidCharMask);
-        }
-        return res.getRight();
-      }
+  private Object convertField(Object value, String name, Schema schema) {
+    JsonFieldProcessor processor = getProcessorForSchema(schema);
+    return processor.convertField(value, name, schema);
+  }
 
-      protected abstract Pair<Boolean, Object> convert(Object value, String 
name, Schema schema, boolean shouldSanitize, String invalidCharMask);
-    }
+  protected JsonFieldProcessor getProcessorForSchema(Schema schema) {
+    JsonFieldProcessor processor = null;
 
-    public static Object convertToAvro(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-      JsonToAvroFieldProcessor processor = getProcessorForSchema(schema);
-      return processor.convertToAvro(value, name, schema, shouldSanitize, 
invalidCharMask);
+    // 3 cases to consider: customized logicalType, logicalType, and type.
+    String customizedLogicalType = schema.getProp("logicalType");
+    LogicalType logicalType = schema.getLogicalType();
+    Type type = schema.getType();
+    if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
+      processor = fieldLogicalTypeProcessorMap.get(customizedLogicalType);
+    } else if (logicalType != null) {
+      processor = fieldLogicalTypeProcessorMap.get(logicalType.getName());
+    } else {
+      processor = fieldTypeProcessorMap.get(type);
     }
 
-    private static JsonToAvroFieldProcessor getProcessorForSchema(Schema 
schema) {
-      JsonToAvroFieldProcessor processor = null;
-
-      // 3 cases to consider: customized logicalType, logicalType, and type.
-      String customizedLogicalType = schema.getProp("logicalType");
-      LogicalType logicalType = schema.getLogicalType();
-      Type type = schema.getType();
-      if (customizedLogicalType != null && !customizedLogicalType.isEmpty()) {
-        processor = 
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(customizedLogicalType);
-      } else if (logicalType != null) {
-        processor = 
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS.get(logicalType.getName());
-      } else {
-        processor = AVRO_TYPE_FIELD_TYPE_PROCESSORS.get(type);
-      }
+    ValidationUtils.checkArgument(
+        processor != null, String.format("JsonConverter cannot handle type: 
%s", type));
+    return processor;
+  }
 
-      ValidationUtils.checkArgument(
-          processor != null, String.format("JsonConverter cannot handle type: 
%s", type));
-      return processor;
-    }
+  /**
+   * Build type processor map for each avro type.
+   */
+  private Map<Schema.Type, JsonFieldProcessor> getFieldTypeProcessors() {
+    Map<Schema.Type, JsonFieldProcessor> fieldTypeProcessors = new 
EnumMap<>(Schema.Type.class);
+    fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
+    fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
+    fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
+    fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
+    fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
+    fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
+    fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
+    fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
+    fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
+    fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
+    fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
+    fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
+    return Collections.unmodifiableMap(fieldTypeProcessors);
+  }
 
-    // Avro primitive and complex type processors.
-    private static final Map<Schema.Type, JsonToAvroFieldProcessor> 
AVRO_TYPE_FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
-    // Avro logical type processors.
-    private static final Map<String, JsonToAvroFieldProcessor> 
AVRO_LOGICAL_TYPE_FIELD_PROCESSORS = getLogicalFieldTypeProcessors();
+  private Map<String, JsonFieldProcessor> getLogicalFieldTypeProcessors() {
+    return CollectionUtils.createImmutableMap(
+      Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), 
generateDecimalLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), 
generateTimeMicroLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), 
generateTimeMilliLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.DATE.getValue(), 
generateDateLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), 
generateLocalTimeStampMicroLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), 
generateLocalTimeStampMilliLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), 
generateTimestampMicroLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), 
generateTimestampMilliLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), 
generateDurationLogicalTypeHandler()),
+      Pair.of(AvroLogicalTypeEnum.UUID.getValue(), 
generateStringTypeHandler()));
+  }
 
-    /**
-     * Build type processor map for each avro type.
-     */
-    private static Map<Schema.Type, JsonToAvroFieldProcessor> 
getFieldTypeProcessors() {
-      Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = new 
EnumMap<>(Schema.Type.class);
-      fieldTypeProcessors.put(Type.STRING, generateStringTypeHandler());
-      fieldTypeProcessors.put(Type.BOOLEAN, generateBooleanTypeHandler());
-      fieldTypeProcessors.put(Type.DOUBLE, generateDoubleTypeHandler());
-      fieldTypeProcessors.put(Type.FLOAT, generateFloatTypeHandler());
-      fieldTypeProcessors.put(Type.INT, generateIntTypeHandler());
-      fieldTypeProcessors.put(Type.LONG, generateLongTypeHandler());
-      fieldTypeProcessors.put(Type.ARRAY, generateArrayTypeHandler());
-      fieldTypeProcessors.put(Type.RECORD, generateRecordTypeHandler());
-      fieldTypeProcessors.put(Type.ENUM, generateEnumTypeHandler());
-      fieldTypeProcessors.put(Type.MAP, generateMapTypeHandler());
-      fieldTypeProcessors.put(Type.BYTES, generateBytesTypeHandler());
-      fieldTypeProcessors.put(Type.FIXED, generateFixedTypeHandler());
-      return Collections.unmodifiableMap(fieldTypeProcessors);
-    }
+  protected JsonFieldProcessor generateDecimalLogicalTypeHandler() {
+    return new DecimalToAvroLogicalTypeProcessor();
+  }
 
-    private static Map<String, JsonToAvroFieldProcessor> 
getLogicalFieldTypeProcessors() {
-      return CollectionUtils.createImmutableMap(
-        Pair.of(AvroLogicalTypeEnum.DECIMAL.getValue(), new 
DecimalLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.TIME_MICROS.getValue(), new 
TimeMicroLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.TIME_MILLIS.getValue(), new 
TimeMilliLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.DATE.getValue(), new 
DateLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS.getValue(), new 
LocalTimestampMicroLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS.getValue(), new 
LocalTimestampMilliLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MICROS.getValue(), new 
TimestampMicroLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.TIMESTAMP_MILLIS.getValue(), new 
TimestampMilliLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.DURATION.getValue(), new 
DurationLogicalTypeProcessor()),
-        Pair.of(AvroLogicalTypeEnum.UUID.getValue(), 
generateStringTypeHandler()));
-    }
+  protected JsonFieldProcessor generateTimeMicroLogicalTypeHandler() {
+    return new TimeMicroLogicalTypeProcessor();
+  }
 
-    private static class DecimalLogicalTypeProcessor extends 
JsonToAvroFieldProcessor {
-      @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
+  protected JsonFieldProcessor generateTimeMilliLogicalTypeHandler() {
+    return new TimeMilliLogicalTypeProcessor();
+  }
 
-        if (!isValidDecimalTypeConfig(schema)) {
-          return Pair.of(false, null);
-        }
+  protected JsonFieldProcessor generateDateLogicalTypeHandler() {
+    return new DateToAvroLogicalTypeProcessor();
+  }
 
-        // Case 1: Input is a list. It is expected to be raw Fixed byte array 
input, and we only support
-        // parsing it to Fixed avro type.
-        if (value instanceof List<?> && schema.getType() == Type.FIXED) {
-          JsonToAvroFieldProcessor processor = generateFixedTypeHandler();
-          return processor.convert(value, name, schema, shouldSanitize, 
invalidCharMask);
-        }
+  protected JsonFieldProcessor generateLocalTimeStampMicroLogicalTypeHandler() 
{
+    return new LocalTimestampMicroLogicalTypeProcessor();
+  }
 
-        // Case 2: Input is a number or String number.
-        LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
-        Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
-        if (Boolean.FALSE.equals(parseResult.getLeft())) {
-          return Pair.of(false, null);
-        }
-        BigDecimal bigDecimal = parseResult.getRight();
-
-        // As we don't do rounding, the validation will enforce the scale part 
and the integer part are all within the
-        // limit. As a result, if scale is 2 precision is 5, we only allow 3 
digits for the integer.
-        // Allowed: 123.45, 123, 0.12
-        // Disallowed: 1234 (4 digit integer while the scale has already 
reserved 2 digit out of the 5 digit precision)
-        //             123456, 0.12345
-        if (bigDecimal.scale() > decimalType.getScale()
-            || (bigDecimal.precision() - bigDecimal.scale()) > 
(decimalType.getPrecision() - decimalType.getScale())) {
-          // Correspond to case
-          // org.apache.avro.AvroTypeException: Cannot encode decimal with 
scale 5 as scale 2 without rounding.
-          // org.apache.avro.AvroTypeException: Cannot encode decimal with 
scale 3 as scale 2 without rounding
-          return Pair.of(false, null);
-        }
+  protected JsonFieldProcessor generateLocalTimeStampMilliLogicalTypeHandler() 
{
+    return new LocalTimestampMilliLogicalTypeProcessor();
+  }
 
-        switch (schema.getType()) {
-          case BYTES:
-            // Convert to primitive Arvo type that logical type Decimal uses.
-            ByteBuffer byteBuffer = new 
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
-            return Pair.of(true, byteBuffer);
-          case FIXED:
-            GenericFixed fixedValue = new 
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
-            return Pair.of(true, fixedValue);
-          default: {
-            return Pair.of(false, null);
-          }
-        }
-      }
+  protected JsonFieldProcessor generateTimestampMicroLogicalTypeHandler() {
+    return new TimestampMicroLogicalTypeProcessor();
+  }
 
-      /**
-       * Check if the given schema is a valid decimal type configuration.
-       */
-      private static boolean isValidDecimalTypeConfig(Schema schema) {
-        LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
-        // At the time when the schema is found not valid when it is parsed, 
the Avro Schema.parse will just silently
-        // set the schema to be null instead of throwing exceptions. 
Correspondingly, we just check if it is null here.
-        if (decimalType == null) {
-          return false;
-        }
-        // Even though schema is validated at schema parsing phase, still 
validate here to be defensive.
-        decimalType.validate(schema);
-        return true;
-      }
+  protected JsonFieldProcessor generateTimestampMilliLogicalTypeHandler() {
+    return new TimestampMilliLogicalTypeProcessor();
+  }
 
-      /**
-       * Parse the object to BigDecimal.
-       *
-       * @param obj Object to be parsed
-       * @return Pair object, with left as boolean indicating if the parsing 
was successful and right as the
-       * BigDecimal value.
-       */
-      private static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object 
obj) {
-        // Case 1: Object is a number.
-        if (obj instanceof Number) {
-          return Pair.of(true, BigDecimal.valueOf(((Number) 
obj).doubleValue()));
-        }
+  protected JsonFieldProcessor generateDurationLogicalTypeHandler() {
+    return new DurationToAvroLogicalTypeProcessor();
+  }
 
-        // Case 2: Object is a number in String format.
-        if (obj instanceof String) {
-          BigDecimal bigDecimal = null;
-          try {
-            bigDecimal = new BigDecimal(((String) obj));
-          } catch (java.lang.NumberFormatException ignored) {
-            /* ignore */
-          }
-          return Pair.of(bigDecimal != null, bigDecimal);
-        }
+  private class DecimalToAvroLogicalTypeProcessor extends 
DecimalLogicalTypeProcessor {
+    @Override
+    public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+      if (!isValidDecimalTypeConfig(schema)) {
         return Pair.of(false, null);
       }
-    }
-
-    private static class DurationLogicalTypeProcessor extends 
JsonToAvroFieldProcessor {
-      private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
-
-      /**
-       * We expect the input to be a list of 3 integers representing months, 
days and milliseconds.
-       */
-      private boolean isValidDurationInput(Object value) {
-        if (!(value instanceof List<?>)) {
-          return false;
-        }
-        List<?> list = (List<?>) value;
-        if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
-          return false;
-        }
-        for (Object element : list) {
-          if (!(element instanceof Integer)) {
-            return false;
-          }
-        }
-        return true;
-      }
-
-      /**
-       * Convert the given object to Avro object with schema whose logical 
type is duration.
-       */
-      @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
 
-        if (!isValidDurationTypeConfig(schema)) {
-          return Pair.of(false, null);
-        }
-        if (!isValidDurationInput(value)) {
-          return Pair.of(false, null);
-        }
-        // After the validation the input can be safely cast to List<Integer> 
with 3 elements.
-        List<?> list = (List<?>) value;
-        List<Integer> converval = list.stream()
-            .filter(Integer.class::isInstance)
-            .map(Integer.class::cast)
-            .collect(Collectors.toList());
-
-        ByteBuffer buffer = 
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
-        for (Integer element : converval) {
-          buffer.putInt(element);  // months
-        }
-        return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
+      // Case 1: Input is a list. It is expected to be raw Fixed byte array 
input, and we only support
+      // parsing it to Fixed avro type.
+      if (value instanceof List<?> && schema.getType() == Type.FIXED) {
+        JsonFieldProcessor processor = generateFixedTypeHandler();
+        return processor.convert(value, name, schema);
       }
 
-      /**
-       * Check if the given schema is a valid decimal type configuration.
-       */
-      private static boolean isValidDurationTypeConfig(Schema schema) {
-        String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
-        LogicalType durationType = schema.getLogicalType();
-        String durationTypeProp = schema.getProp("logicalType");
-        // 1. The Avro type should be "Fixed".
-        // 2. Fixed size must be of 12 bytes as it hold 3 integers.
-        // 3. Logical type name should be "duration". The name might be stored 
in different places based on Avro version
-        //    being used here.
-        return schema.getType().equals(Type.FIXED)
-            && schema.getFixedSize() == Integer.BYTES * 
NUM_ELEMENTS_FOR_DURATION_TYPE
-            && (durationType != null && 
durationType.getName().equals(durationTypeName)
-            || durationTypeProp != null && 
durationTypeProp.equals(durationTypeName));
+      // Case 2: Input is a number or String number.
+      LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
+      Pair<Boolean, BigDecimal> parseResult = parseObjectToBigDecimal(value);
+      if (Boolean.FALSE.equals(parseResult.getLeft())) {
+        return Pair.of(false, null);
       }
-    }
-
-    /**
-     * Processor utility handling Number inputs. Consumed by 
TimeLogicalTypeProcessor. 
-     */
-    private interface NumericParser {
-      // Convert the input number to Avro data type according to the class
-      // implementing this interface.
-      Pair<Boolean, Object> handleNumberValue(Number value);
-
-      // Convert the input number to Avro data type according to the class
-      // implementing this interface.
-      // @param value the input number in string format.
-      Pair<Boolean, Object> handleStringNumber(String value);
-
-      interface IntParser extends NumericParser {
-        @Override
-        default Pair<Boolean, Object> handleNumberValue(Number value) {
-          return Pair.of(true, value.intValue());
-        }
-
-        @Override
-        default Pair<Boolean, Object> handleStringNumber(String value) {
-          return Pair.of(true, Integer.parseInt(value));
-        }
+      BigDecimal bigDecimal = parseResult.getRight();
+
+      // As we don't do rounding, the validation will enforce the scale part 
and the integer part are all within the
+      // limit. As a result, if scale is 2 precision is 5, we only allow 3 
digits for the integer.
+      // Allowed: 123.45, 123, 0.12
+      // Disallowed: 1234 (4 digit integer while the scale has already 
reserved 2 digit out of the 5 digit precision)
+      //             123456, 0.12345
+      if (bigDecimal.scale() > decimalType.getScale()
+          || (bigDecimal.precision() - bigDecimal.scale()) > 
(decimalType.getPrecision() - decimalType.getScale())) {
+        // Correspond to case
+        // org.apache.avro.AvroTypeException: Cannot encode decimal with scale 
5 as scale 2 without rounding.
+        // org.apache.avro.AvroTypeException: Cannot encode decimal with scale 
3 as scale 2 without rounding
+        return Pair.of(false, null);
       }
 
-      interface LongParser extends NumericParser {
-        @Override
-        default Pair<Boolean, Object> handleNumberValue(Number value) {
-          return Pair.of(true, value.longValue());
-        }
-
-        @Override
-        default Pair<Boolean, Object> handleStringNumber(String value) {
-          return Pair.of(true, Long.parseLong(value));
+      switch (schema.getType()) {
+        case BYTES:
+          // Convert to primitive Arvo type that logical type Decimal uses.
+          ByteBuffer byteBuffer = new 
Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType);
+          return Pair.of(true, byteBuffer);
+        case FIXED:
+          GenericFixed fixedValue = new 
Conversions.DecimalConversion().toFixed(bigDecimal, schema, decimalType);
+          return Pair.of(true, fixedValue);
+        default: {
+          return Pair.of(false, null);
         }
       }
     }
+  }
+
+  private static class DurationToAvroLogicalTypeProcessor extends 
DurationLogicalTypeProcessor {
 
     /**
-     * Base Class for converting object to avro logical type 
TimeMilli/TimeMicro.
+     * Convert the given object to Avro object with schema whose logical type 
is duration.
      */
-    private abstract static class TimeLogicalTypeProcessor extends 
JsonToAvroFieldProcessor implements NumericParser {
-
-      protected static final LocalDateTime LOCAL_UNIX_EPOCH = 
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
-
-      // Logical type the processor is handling.
-      private final AvroLogicalTypeEnum logicalTypeEnum;
-
-      public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
-        this.logicalTypeEnum = logicalTypeEnum;
-      }
+    @Override
+    public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
 
-      /**
-       * Main function that convert input to Object with java data type 
specified by schema
-       */
-      @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-        LogicalType logicalType = schema.getLogicalType();
-        if (logicalType == null) {
-          return Pair.of(false, null);
-        }
-        logicalType.validate(schema);
-        if (value instanceof Number) {
-          return handleNumberValue((Number) value);
-        }
-        if (value instanceof String) {
-          String valStr = (String) value;
-          if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
-            return handleStringNumber(valStr);
-          } else if (isWellFormedDateTime(valStr)) {
-            return handleStringValue(valStr);
-          }
-        }
+      if (!isValidDurationTypeConfig(schema)) {
         return Pair.of(false, null);
       }
-
-      // Handle the case when the input is a string that may be parsed as a 
time.
-      protected abstract Pair<Boolean, Object> handleStringValue(String value);
-
-      protected DateTimeFormatter getDateTimeFormatter() {
-        DateTimeParseContext ctx = 
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
-        assert ctx != null : String.format("%s should have configured date 
time context.", logicalTypeEnum.getValue());
-        return ctx.dateTimeFormatter;
-      }
-
-      protected Pattern getDateTimePattern() {
-        DateTimeParseContext ctx = 
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
-        assert ctx != null : String.format("%s should have configured date 
time context.", logicalTypeEnum.getValue());
-        return ctx.dateTimePattern;
-      }
-
-      // Depending on the logical type the processor handles, they use 
different parsing context
-      // when they need to parse a timestamp string in handleStringValue.
-      private static class DateTimeParseContext {
-        public DateTimeParseContext(DateTimeFormatter dateTimeFormatter, 
Pattern dateTimePattern) {
-          this.dateTimeFormatter = dateTimeFormatter;
-          this.dateTimePattern = dateTimePattern;
-        }
-
-        public final Pattern dateTimePattern;
-
-        public final DateTimeFormatter dateTimeFormatter;
-      }
-
-      private static final Map<AvroLogicalTypeEnum, DateTimeParseContext> 
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
-
-      private static Map<AvroLogicalTypeEnum, DateTimeParseContext> 
getParseContext() {
-        // Formatter for parsing a timestamp. It assumes UTC timezone, with 
'Z' as the zone ID.
-        // No pattern is defined as ISO_INSTANT format internal is not clear.
-        DateTimeParseContext dateTimestampParseContext = new 
DateTimeParseContext(
-            DateTimeFormatter.ISO_INSTANT,
-            null /* match everything*/);
-        // Formatter for parsing a time of day. The pattern is derived from 
ISO_LOCAL_TIME definition.
-        // Pattern asserts the string is
-        // <optional sign><Hour>:<Minute> + optional <second> + optional 
<fractional second>
-        DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
-            DateTimeFormatter.ISO_LOCAL_TIME,
-            
Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
-        // Formatter for parsing a timestamp in a local timezone. The pattern 
is derived from ISO_LOCAL_DATE_TIME definition.
-        // Pattern asserts the string is
-        // <optional sign><Year>-<Month>-<Day>T<Hour>:<Minute> + optional 
<second> + optional <fractional second>
-        DateTimeParseContext localTimestampParseContext = new 
DateTimeParseContext(
-            DateTimeFormatter.ISO_LOCAL_DATE_TIME,
-            
Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?")
-        );
-        // Formatter for parsing a date. The pattern is derived from 
ISO_LOCAL_DATE definition.
-        // Pattern asserts the string is
-        // <optional sign><Year>-<Month>-<Day>
-        DateTimeParseContext localDateParseContext = new DateTimeParseContext(
-            DateTimeFormatter.ISO_LOCAL_DATE,
-            Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}?")
-        );
-
-        EnumMap<AvroLogicalTypeEnum, DateTimeParseContext> ctx = new 
EnumMap<>(AvroLogicalTypeEnum.class);
-        ctx.put(AvroLogicalTypeEnum.TIME_MICROS, dateTimeParseContext);
-        ctx.put(AvroLogicalTypeEnum.TIME_MILLIS, dateTimeParseContext);
-        ctx.put(AvroLogicalTypeEnum.DATE, localDateParseContext);
-        ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS, 
localTimestampParseContext);
-        ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS, 
localTimestampParseContext);
-        ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MICROS, 
dateTimestampParseContext);
-        ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MILLIS, 
dateTimestampParseContext);
-        return Collections.unmodifiableMap(ctx);
-      }
-
-      // Pattern validating if it is an number in string form.
-      // Only check at most 19 digits as this is the max num of digits for 
LONG.MAX_VALUE to contain the cost of regex matching.
-      protected static final Pattern ALL_DIGITS_WITH_OPTIONAL_SIGN = 
Pattern.compile("^[-+]?\\d{1,19}$");
-
-      /**
-       * Check if the given string is a well-formed date time string.
-       * If no pattern is defined, it will always return true.
-       */
-      private boolean isWellFormedDateTime(String value) {
-        Pattern pattern = getDateTimePattern();
-        return pattern == null || pattern.matcher(value).matches();
-      }
-
-      protected Pair<Boolean, Instant> convertToInstantTime(String input) {
-        // Parse the input timestamp, DateTimeFormatter.ISO_INSTANT is implied 
here
-        Instant time = null;
-        try {
-          time = Instant.parse(input);
-        } catch (DateTimeParseException ignore) {
-          /* ignore */
-        }
-        return Pair.of(time != null, time);
-      }
-
-      protected Pair<Boolean, LocalTime> convertToLocalTime(String input) {
-        // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is 
implied here
-        LocalTime time = null;
-        try {
-          // Try parsing as an ISO date
-          time = LocalTime.parse(input);
-        } catch (DateTimeParseException ignore) {
-          /* ignore */
-        }
-        return Pair.of(time != null, time);
+      if (!isValidDurationInput(value)) {
+        return Pair.of(false, null);
       }
+      // After the validation the input can be safely cast to List<Integer> 
with 3 elements.
+      List<?> list = (List<?>) value;
+      List<Integer> converval = list.stream()
+          .filter(Integer.class::isInstance)
+          .map(Integer.class::cast)
+          .collect(Collectors.toList());
 
-      protected Pair<Boolean, LocalDateTime> convertToLocalDateTime(String 
input) {
-        // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_DATE_TIME is 
implied here
-        LocalDateTime time = null;
-        try {
-          // Try parsing as an ISO date
-          time = LocalDateTime.parse(input, getDateTimeFormatter());
-        } catch (DateTimeParseException ignore) {
-          /* ignore */
-        }
-        return Pair.of(time != null, time);
+      ByteBuffer buffer = 
ByteBuffer.allocate(schema.getFixedSize()).order(ByteOrder.LITTLE_ENDIAN);
+      for (Integer element : converval) {
+        buffer.putInt(element);  // months
       }
+      return Pair.of(true, new GenericData.Fixed(schema, buffer.array()));
     }
+  }
 
-    private static class DateLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.IntParser {
-      public DateLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.DATE);
-      }
-
-      @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, LocalDate> result = convertToLocalDate(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
-        }
-        LocalDate date = result.getRight();
-        int daysSinceEpoch = (int) 
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date);
-        return Pair.of(true, daysSinceEpoch);
-      }
+  private static class DateToAvroLogicalTypeProcessor extends 
DateLogicalTypeProcessor {
 
-      private Pair<Boolean, LocalDate> convertToLocalDate(String input) {
-        // Parse the input date string, DateTimeFormatter.ISO_LOCAL_DATE is 
implied here
-        LocalDate date = null;
-        try {
-          // Try parsing as an ISO date
-          date = LocalDate.parse(input);
-        } catch (DateTimeParseException ignore) {
-          /* ignore */
-        }
-        return Pair.of(date != null, date);
-      }
+    @Override
+    public Pair<Boolean, Object> convert(
+        Object value, String name, Schema schema) {
+      return convertCommon(
+          new Parser.IntParser() {
+            @Override
+            public Pair<Boolean, Object> handleStringValue(String value) {
+              if (!isWellFormedDateTime(value)) {
+                return Pair.of(false, null);
+              }
+              Pair<Boolean, LocalDate> result = convertToLocalDate(value);
+              if (!result.getLeft()) {
+                return Pair.of(false, null);
+              }
+              LocalDate date = result.getRight();
+              int daysSinceEpoch = (int) 
ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date);
+              return Pair.of(true, daysSinceEpoch);
+            }
+          },
+          value, schema);
     }
+  }
 
-    /**
-     * Processor for TimeMilli logical type.
-     */
-    private static class TimeMilliLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.IntParser {
-      public TimeMilliLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.TIME_MILLIS);
-      }
-
+  protected JsonFieldProcessor generateBooleanTypeHandler() {
+    return new JsonFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, LocalTime> result = convertToLocalTime(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        if (value instanceof Boolean) {
+          return Pair.of(true, value);
         }
-        LocalTime time = result.getRight();
-        Integer millisOfDay = time.toSecondOfDay() * 1000 + time.getNano() / 
1000000;
-        return Pair.of(true, millisOfDay);
-      }
-    }
-
-    /**
-     * Processor for TimeMicro logical type.
-     */
-    private static class TimeMicroLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.LongParser {
-      public TimeMicroLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.TIME_MICROS);
+        return Pair.of(false, null);
       }
+    };
+  }
 
+  protected JsonFieldProcessor generateIntTypeHandler() {
+    return new JsonFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, LocalTime> result = convertToLocalTime(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        if (value instanceof Number) {
+          return Pair.of(true, ((Number) value).intValue());
+        } else if (value instanceof String) {
+          return Pair.of(true, Integer.valueOf((String) value));
         }
-        LocalTime time = result.getRight();
-        Long microsOfDay = (long) time.toSecondOfDay() * 1000000 + 
time.getNano() / 1000;
-        return Pair.of(true, microsOfDay);
-      }
-    }
-
-    /**
-     * Processor for TimeMicro logical type.
-     */
-    private static class LocalTimestampMicroLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.LongParser {
-      public LocalTimestampMicroLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS);
+        return Pair.of(false, null);
       }
+    };
+  }
 
+  protected JsonFieldProcessor generateDoubleTypeHandler() {
+    return new JsonFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, LocalDateTime> result = convertToLocalDateTime(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        if (value instanceof Number) {
+          return Pair.of(true, ((Number) value).doubleValue());
+        } else if (value instanceof String) {
+          return Pair.of(true, Double.valueOf((String) value));
         }
-        LocalDateTime time = result.getRight();
-
-        // Calculate the difference in milliseconds
-        long diffInMicros = LOCAL_UNIX_EPOCH.until(time, 
ChronoField.MICRO_OF_SECOND.getBaseUnit());
-        return Pair.of(true, diffInMicros);
-      }
-    }
-
-    /**
-     * Processor for TimeMicro logical type.
-     */
-    private static class TimestampMicroLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.LongParser {
-      public TimestampMicroLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.TIMESTAMP_MICROS);
+        return Pair.of(false, null);
       }
+    };
+  }
 
+  protected JsonFieldProcessor generateFloatTypeHandler() {
+    return new JsonFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, Instant> result = convertToInstantTime(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        if (value instanceof Number) {
+          return Pair.of(true, ((Number) value).floatValue());
+        } else if (value instanceof String) {
+          return Pair.of(true, Float.valueOf((String) value));
         }
-        Instant time = result.getRight();
-
-        // Calculate the difference in milliseconds
-        long diffInMicro = Instant.EPOCH.until(time, 
ChronoField.MICRO_OF_SECOND.getBaseUnit());
-        return Pair.of(true, diffInMicro);
-      }
-    }
-
-    /**
-     * Processor for TimeMicro logical type.
-     */
-    private static class LocalTimestampMilliLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.LongParser {
-      public LocalTimestampMilliLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS);
+        return Pair.of(false, null);
       }
+    };
+  }
 
+  protected JsonFieldProcessor generateLongTypeHandler() {
+    return new JsonFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, LocalDateTime> result = convertToLocalDateTime(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        if (value instanceof Number) {
+          return Pair.of(true, ((Number) value).longValue());
+        } else if (value instanceof String) {
+          return Pair.of(true, Long.valueOf((String) value));
         }
-        LocalDateTime time = result.getRight();
-
-        // Calculate the difference in milliseconds
-        long diffInMillis = LOCAL_UNIX_EPOCH.until(time, 
ChronoField.MILLI_OF_SECOND.getBaseUnit());
-        return Pair.of(true, diffInMillis);
+        return Pair.of(false, null);
       }
-    }
+    };
+  }
 
-    /**
-     * Processor for TimeMicro logical type.
-     */
-    private static class TimestampMilliLogicalTypeProcessor extends 
TimeLogicalTypeProcessor
-        implements NumericParser.LongParser {
-      public TimestampMilliLogicalTypeProcessor() {
-        super(AvroLogicalTypeEnum.TIMESTAMP_MILLIS);
+  protected JsonFieldProcessor generateStringTypeHandler() {
+    return new JsonFieldProcessor() {
+      @Override
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        return Pair.of(true, value.toString());
       }
+    };
+  }
 
+  protected JsonFieldProcessor generateBytesTypeHandler() {
+    return new JsonFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> handleStringValue(String value) {
-        Pair<Boolean, Instant> result = convertToInstantTime(value);
-        if (!result.getLeft()) {
-          return Pair.of(false, null);
-        }
-        Instant time = result.getRight();
-
-        // Calculate the difference in milliseconds
-        long diffInMillis = Instant.EPOCH.until(time, 
ChronoField.MILLI_OF_SECOND.getBaseUnit());
-        return Pair.of(true, diffInMillis);
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        // Should return ByteBuffer (see GenericData.isBytes())
+        return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
       }
-    }
-
-    private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          if (value instanceof Boolean) {
-            return Pair.of(true, value);
-          }
-          return Pair.of(false, null);
-        }
-      };
-    }
-
-    private static JsonToAvroFieldProcessor generateIntTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          if (value instanceof Number) {
-            return Pair.of(true, ((Number) value).intValue());
-          } else if (value instanceof String) {
-            return Pair.of(true, Integer.valueOf((String) value));
-          }
-          return Pair.of(false, null);
-        }
-      };
-    }
-
-    private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          if (value instanceof Number) {
-            return Pair.of(true, ((Number) value).doubleValue());
-          } else if (value instanceof String) {
-            return Pair.of(true, Double.valueOf((String) value));
-          }
-          return Pair.of(false, null);
-        }
-      };
-    }
-
-    private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          if (value instanceof Number) {
-            return Pair.of(true, ((Number) value).floatValue());
-          } else if (value instanceof String) {
-            return Pair.of(true, Float.valueOf((String) value));
-          }
-          return Pair.of(false, null);
-        }
-      };
-    }
+    };
+  }
 
-    private static JsonToAvroFieldProcessor generateLongTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          if (value instanceof Number) {
-            return Pair.of(true, ((Number) value).longValue());
-          } else if (value instanceof String) {
-            return Pair.of(true, Long.valueOf((String) value));
-          }
-          return Pair.of(false, null);
-        }
-      };
-    }
+  protected JsonFieldProcessor generateFixedTypeHandler() {
+    return new AvroFixedTypeProcessor();
+  }
 
-    private static JsonToAvroFieldProcessor generateStringTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          return Pair.of(true, value.toString());
-        }
-      };
+  private static class AvroFixedTypeProcessor extends FixedTypeProcessor {
+    @Override
+    public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+      return Pair.of(true, new GenericData.Fixed(
+          schema, convertToJavaObject(value, name, schema)));
     }
+  }
 
-    private static JsonToAvroFieldProcessor generateBytesTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          // Should return ByteBuffer (see GenericData.isBytes())
-          return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
-        }
-      };
+  private static class AvroEnumTypeProcessor extends EnumTypeProcessor {
+    @Override
+    public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+      return Pair.of(true, new GenericData.EnumSymbol(schema, 
convertToJavaObject(value, name, schema)));
     }
+  }
 
-    private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          // The ObjectMapper use List to represent FixedType
-          // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to 
ArrayList<Integer>
-          List<Integer> converval = (List<Integer>) value;
-          byte[] src = new byte[converval.size()];
-          for (int i = 0; i < converval.size(); i++) {
-            src[i] = converval.get(i).byteValue();
-          }
-          byte[] dst = new byte[schema.getFixedSize()];
-          System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), 
src.length));
-          return Pair.of(true, new GenericData.Fixed(schema, dst));
-        }
-      };
-    }
+  protected JsonFieldProcessor generateEnumTypeHandler() {
+    return new AvroEnumTypeProcessor();
+  }
 
-    private static JsonToAvroFieldProcessor generateEnumTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          if (schema.getEnumSymbols().contains(value.toString())) {
-            return Pair.of(true, new GenericData.EnumSymbol(schema, 
value.toString()));
-          }
-          throw new HoodieJsonToAvroConversionException(String.format("Symbol 
%s not in enum", value.toString()),
-              schema.getFullName(), schema, shouldSanitize, invalidCharMask);
-        }
-      };
-    }
+  protected JsonFieldProcessor generateRecordTypeHandler() {
+    return new JsonFieldProcessor() {
+      @Override
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        return Pair.of(true, convertJsonToAvro((Map<String, Object>) value, 
schema));
+      }
+    };
+  }
 
-    private static JsonToAvroFieldProcessor generateRecordTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          return Pair.of(true, convertJsonToAvro((Map<String, Object>) value, 
schema, shouldSanitize, invalidCharMask));
+  protected JsonFieldProcessor generateArrayTypeHandler() {
+    return new JsonFieldProcessor() {
+      private List<Object> convertToJavaObject(Object value, String name, 
Schema schema) {
+        Schema elementSchema = schema.getElementType();
+        List<Object> listRes = new ArrayList<>();
+        for (Object v : (List) value) {
+          listRes.add(convertJsonField(v, name, elementSchema));
         }
-      };
-    }
+        return listRes;
+      }
 
-    private static JsonToAvroFieldProcessor generateArrayTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          Schema elementSchema = schema.getElementType();
-          List<Object> listRes = new ArrayList<>();
-          for (Object v : (List) value) {
-            listRes.add(convertJsonToAvroField(v, name, elementSchema, 
shouldSanitize, invalidCharMask));
-          }
-          return Pair.of(true, new GenericData.Array<>(schema, listRes));
-        }
-      };
-    }
+      @Override
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        return Pair.of(true, new GenericData.Array<>(
+            schema,
+            convertToJavaObject(
+                value,
+                name,
+                schema)));
+      }
+    };
+  }
 
-    private static JsonToAvroFieldProcessor generateMapTypeHandler() {
-      return new JsonToAvroFieldProcessor() {
-        @Override
-        public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema, boolean shouldSanitize, String invalidCharMask) {
-          Schema valueSchema = schema.getValueType();
-          Map<String, Object> mapRes = new HashMap<>();
-          for (Map.Entry<String, Object> v : ((Map<String, Object>) 
value).entrySet()) {
-            mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, 
valueSchema, shouldSanitize, invalidCharMask));
-          }
-          return Pair.of(true, mapRes);
+  protected JsonFieldProcessor generateMapTypeHandler() {
+    return new JsonFieldProcessor() {
+      @Override
+      public Pair<Boolean, Object> convert(Object value, String name, Schema 
schema) {
+        Schema valueSchema = schema.getValueType();
+        Map<String, Object> mapRes = new HashMap<>();
+        for (Map.Entry<String, Object> v : ((Map<String, Object>) 
value).entrySet()) {
+          mapRes.put(v.getKey(), convertJsonField(v.getValue(), name, 
valueSchema));
         }
-      };
-    }
+        return Pair.of(true, mapRes);
+      }
+    };
   }
 
-  /**
-   * Exception Class for any schema conversion issue.
-   */
-  public static class HoodieJsonToAvroConversionException extends 
HoodieException {
-
-    private final Object value;
-
-    private final String fieldName;
-    private final Schema schema;
-    private final boolean shouldSanitize;
-    private final String invalidCharMask;
-
-    public HoodieJsonToAvroConversionException(Object value, String fieldName, 
Schema schema, boolean shouldSanitize, String invalidCharMask) {
-      this.value = value;
-      this.fieldName = fieldName;
-      this.schema = schema;
-      this.shouldSanitize = shouldSanitize;
-      this.invalidCharMask = invalidCharMask;
-    }
-
-    @Override
-    public String toString() {
-      if (shouldSanitize) {
-        return String.format("Json to Avro Type conversion error for field %s, 
%s for %s. Field sanitization is enabled with a mask of %s.", fieldName, value, 
schema, invalidCharMask);
-      }
-      return String.format("Json to Avro Type conversion error for field %s, 
%s for %s", fieldName, value, schema);
+  protected HoodieJsonConversionException buildConversionException(Object 
value, String fieldName, Schema schema, boolean shouldSanitize, String 
invalidCharMask) {
+    String errorMsg;
+    if (shouldSanitize) {
+      errorMsg = String.format("Json to Avro Type conversion error for field 
%s, %s for %s. Field sanitization is enabled with a mask of %s.", fieldName, 
value, schema, invalidCharMask);
+    } else {
+      errorMsg = String.format("Json to Avro Type conversion error for field 
%s, %s for %s", fieldName, value, schema);
     }
+    return new HoodieJsonConversionException(errorMsg);
   }
-}
+
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DateLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DateLogicalTypeProcessor.java
new file mode 100644
index 00000000000..9d00e53d261
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DateLogicalTypeProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeParseException;
+
+public abstract class DateLogicalTypeProcessor extends 
TimeLogicalTypeProcessor {
+  public DateLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.DATE);
+  }
+
+  protected Pair<Boolean, LocalDate> convertToLocalDate(String input) {
+    // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is implied 
here
+    LocalDate date = null;
+    try {
+      // Try parsing as an ISO date
+      date = LocalDate.parse(input);
+    } catch (DateTimeParseException ignore) {
+      /* ignore */
+    }
+    return Pair.of(date != null, date);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
new file mode 100644
index 00000000000..1c49e086e46
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.math.BigDecimal;
+
+public abstract class DecimalLogicalTypeProcessor extends JsonFieldProcessor {
+
+  /**
+   * Check if the given schema is a valid decimal type configuration.
+   */
+  protected static boolean isValidDecimalTypeConfig(Schema schema) {
+    LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
+    // At the time when the schema is found not valid when it is parsed, the 
Avro Schema.parse will just silently
+    // set the schema to be null instead of throwing exceptions. 
Correspondingly, we just check if it is null here.
+    if (decimalType == null) {
+      return false;
+    }
+    // Even though schema is validated at schema parsing phase, still validate 
here to be defensive.
+    decimalType.validate(schema);
+    return true;
+  }
+
+  /**
+   * Parse the object to BigDecimal.
+   *
+   * @param obj Object to be parsed
+   * @return Pair object, with left as boolean indicating if the parsing was 
successful and right as the
+   * BigDecimal value.
+   */
+  protected static Pair<Boolean, BigDecimal> parseObjectToBigDecimal(Object 
obj) {
+    if (obj instanceof Number) {
+      return Pair.of(true, BigDecimal.valueOf(((Number) obj).doubleValue()));
+    }
+
+    // Case 2: Object is a number in String format.
+    if (obj instanceof String) {
+      BigDecimal bigDecimal = null;
+      try {
+        bigDecimal = new BigDecimal(((String) obj));
+      } catch (java.lang.NumberFormatException ignored) {
+        /* ignore */
+      }
+      return Pair.of(bigDecimal != null, bigDecimal);
+    }
+    return Pair.of(false, null);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
new file mode 100644
index 00000000000..47e37db5c50
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DurationLogicalTypeProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+public abstract class DurationLogicalTypeProcessor extends JsonFieldProcessor {
+  private static final int NUM_ELEMENTS_FOR_DURATION_TYPE = 3;
+
+  /**
+   * We expect the input to be a list of 3 integers representing months, days 
and milliseconds.
+   */
+  protected boolean isValidDurationInput(Object value) {
+    if (!(value instanceof List<?>)) {
+      return false;
+    }
+    List<?> list = (List<?>) value;
+    if (list.size() != NUM_ELEMENTS_FOR_DURATION_TYPE) {
+      return false;
+    }
+    for (Object element : list) {
+      if (!(element instanceof Integer)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if the given schema is a valid decimal type configuration.
+   */
+  protected static boolean isValidDurationTypeConfig(Schema schema) {
+    String durationTypeName = AvroLogicalTypeEnum.DURATION.getValue();
+    LogicalType durationType = schema.getLogicalType();
+    String durationTypeProp = schema.getProp("logicalType");
+    // 1. The Avro type should be "Fixed".
+    // 2. Fixed size must be of 12 bytes as it hold 3 integers.
+    // 3. Logical type name should be "duration". The name might be stored in 
different places based on Avro version
+    //    being used here.
+    return schema.getType().equals(Schema.Type.FIXED)
+        && schema.getFixedSize() == Integer.BYTES * 
NUM_ELEMENTS_FOR_DURATION_TYPE
+        && (durationType != null && 
durationType.getName().equals(durationTypeName)
+        || durationTypeProp != null && 
durationTypeProp.equals(durationTypeName));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
new file mode 100644
index 00000000000..d21ace8a8a5
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/EnumTypeProcessor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
+
+import org.apache.avro.Schema;
+
+public abstract class EnumTypeProcessor extends JsonFieldProcessor {
+
+  protected Object convertToJavaObject(Object value, String name, Schema 
schema) {
+    if (schema.getEnumSymbols().contains(value.toString())) {
+      return value.toString();
+    }
+    throw new HoodieJsonToAvroConversionException(String.format("Symbol %s not 
in enum", value));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
new file mode 100644
index 00000000000..0604ddad19c
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/FixedTypeProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+public abstract class FixedTypeProcessor extends JsonFieldProcessor {
+  protected byte[] convertToJavaObject(Object value, String name, Schema 
schema) {
+    // The ObjectMapper use List to represent FixedType
+    // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to 
ArrayList<Integer>
+    List<Integer> converval = (List<Integer>) value;
+    byte[] src = new byte[converval.size()];
+    for (int i = 0; i < converval.size(); i++) {
+      src[i] = converval.get(i).byteValue();
+    }
+    byte[] dst = new byte[schema.getFixedSize()];
+    System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), 
src.length));
+    return dst;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
new file mode 100644
index 00000000000..dd76c463eca
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/JsonFieldProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieJsonConversionException;
+
+import org.apache.avro.Schema;
+
+import java.io.Serializable;
+
+public abstract class JsonFieldProcessor implements Serializable {
+
+  public Object convertField(Object value, String name, Schema schema) {
+    Pair<Boolean, Object> res = convert(value, name, schema);
+    if (!res.getLeft()) {
+      throw new HoodieJsonConversionException("failed to convert json to 
avro");
+    }
+    return res.getRight();
+  }
+
+  public abstract Pair<Boolean, Object> convert(Object value, String name, 
Schema schema);
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
new file mode 100644
index 00000000000..20ea08de9d8
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMicroLogicalTypeProcessor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoField;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class LocalTimestampMicroLogicalTypeProcessor extends 
TimeLogicalTypeProcessor {
+  public LocalTimestampMicroLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS);
+  }
+
+  @Override
+  public Pair<Boolean, Object> convert(
+      Object value, String name, Schema schema) {
+    return convertCommon(
+        new Parser.LongParser() {
+          @Override
+          public Pair<Boolean, Object> handleStringValue(String value) {
+            if (!isWellFormedDateTime(value)) {
+              return Pair.of(false, null);
+            }
+            Pair<Boolean, LocalDateTime> result = 
convertToLocalDateTime(value);
+            if (!result.getLeft()) {
+              return Pair.of(false, null);
+            }
+            LocalDateTime time = result.getRight();
+
+            // Calculate the difference in milliseconds
+            long diffInMicros = LOCAL_UNIX_EPOCH.until(time, 
ChronoField.MICRO_OF_SECOND.getBaseUnit());
+            return Pair.of(true, diffInMicros);
+          }
+        },
+        value, schema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
new file mode 100644
index 00000000000..62753daf484
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/LocalTimestampMilliLogicalTypeProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoField;
+
+public class LocalTimestampMilliLogicalTypeProcessor extends 
TimeLogicalTypeProcessor {
+  public LocalTimestampMilliLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS);
+  }
+
+  @Override
+  public Pair<Boolean, Object> convert(
+      Object value, String name, Schema schema) {
+    return convertCommon(
+        new Parser.LongParser() {
+          @Override
+          public Pair<Boolean, Object> handleStringValue(String value) {
+            if (!isWellFormedDateTime(value)) {
+              return Pair.of(false, null);
+            }
+            Pair<Boolean, LocalDateTime> result = 
convertToLocalDateTime(value);
+            if (!result.getLeft()) {
+              return Pair.of(false, null);
+            }
+            LocalDateTime time = result.getRight();
+
+            // Calculate the difference in milliseconds
+            long diffInMillis = LOCAL_UNIX_EPOCH.until(time, 
ChronoField.MILLI_OF_SECOND.getBaseUnit());
+            return Pair.of(true, diffInMillis);
+          }
+        },
+        value, schema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/Parser.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/Parser.java
new file mode 100644
index 00000000000..4a6e4ee9fb9
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/processors/Parser.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+public abstract class Parser {
+  abstract Pair<Boolean, Object> handleNumberValue(Number value);
+
+  abstract Pair<Boolean, Object> handleStringNumber(String value);
+
+  abstract Pair<Boolean, Object> handleStringValue(String value);
+
+  public static class IntParser extends Parser {
+    @Override
+    public Pair<Boolean, Object> handleNumberValue(Number value) {
+      return Pair.of(true, value.intValue());
+    }
+
+    @Override
+    public Pair<Boolean, Object> handleStringNumber(String value) {
+      return Pair.of(true, Integer.parseInt(value));
+    }
+
+    @Override
+    public Pair<Boolean, Object> handleStringValue(String value) {
+      return Pair.of(true, Integer.valueOf(value));
+    }
+  }
+
+  public static class DateParser extends Parser {
+
+    private static long MILLI_SECONDS_PER_DAY = 86400000;
+
+    @Override
+    public Pair<Boolean, Object> handleNumberValue(Number value) {
+      return Pair.of(true, new java.sql.Date(value.intValue() * 
MILLI_SECONDS_PER_DAY));
+    }
+
+    @Override
+    public Pair<Boolean, Object> handleStringNumber(String value) {
+      return Pair.of(true, new java.sql.Date(Integer.parseInt(value) * 
MILLI_SECONDS_PER_DAY));
+    }
+
+    @Override
+    public Pair<Boolean, Object> handleStringValue(String value) {
+      return Pair.of(true, java.sql.Date.valueOf(value));
+    }
+  }
+
+  public static class LongParser extends Parser {
+    @Override
+    public Pair<Boolean, Object> handleNumberValue(Number value) {
+      return Pair.of(true, value.longValue());
+    }
+
+    @Override
+    public Pair<Boolean, Object> handleStringNumber(String value) {
+      return Pair.of(true, Long.parseLong(value));
+    }
+
+    @Override
+    public Pair<Boolean, Object> handleStringValue(String value) {
+      return Pair.of(true, Long.valueOf(value));
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
new file mode 100644
index 00000000000..9cb3b20fb0b
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeLogicalTypeProcessor.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.time.chrono.IsoChronology;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.format.ResolverStyle;
+import java.time.temporal.ChronoField;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
+
+/**
+ * Base Class for converting object to avro logical type TimeMilli/TimeMicro.
+ */
+public abstract class TimeLogicalTypeProcessor extends JsonFieldProcessor {
+
+  protected static final LocalDateTime LOCAL_UNIX_EPOCH = 
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0);
+
+  // Logical type the processor is handling.
+  private final AvroLogicalTypeEnum logicalTypeEnum;
+
+  public TimeLogicalTypeProcessor(AvroLogicalTypeEnum logicalTypeEnum) {
+    this.logicalTypeEnum = logicalTypeEnum;
+  }
+
+  /**
+   * Main function that convert input to Object with java data type specified 
by schema
+   */
+  public Pair<Boolean, Object> convertCommon(Parser parser, Object value, 
Schema schema) {
+    LogicalType logicalType = schema.getLogicalType();
+    if (logicalType == null) {
+      return Pair.of(false, null);
+    }
+    logicalType.validate(schema);
+    if (value instanceof Number) {
+      return parser.handleNumberValue((Number) value);
+    }
+    if (value instanceof String) {
+      String valStr = (String) value;
+      if (ALL_DIGITS_WITH_OPTIONAL_SIGN.matcher(valStr).matches()) {
+        return parser.handleStringNumber(valStr);
+      } else {
+        return parser.handleStringValue(valStr);
+      }
+    }
+    return Pair.of(false, null);
+  }
+
+  protected DateTimeFormatter getDateTimeFormatter() {
+    DateTimeParseContext ctx = 
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+    return ctx == null ? null : ctx.dateTimeFormatter;
+  }
+
+  protected Pattern getDateTimePattern() {
+    DateTimeParseContext ctx = 
DATE_TIME_PARSE_CONTEXT_MAP.get(logicalTypeEnum);
+    return ctx == null ? null : ctx.dateTimePattern;
+  }
+
+  // Depending on the logical type the processor handles, they use different 
parsing context
+  // when they need to parse a timestamp string in handleStringValue.
+  private static class DateTimeParseContext {
+    public DateTimeParseContext(DateTimeFormatter dateTimeFormatter, Pattern 
dateTimePattern) {
+      this.dateTimeFormatter = dateTimeFormatter;
+      this.dateTimePattern = dateTimePattern;
+    }
+
+    public final Pattern dateTimePattern;
+
+    public final DateTimeFormatter dateTimeFormatter;
+  }
+
+  private static final Map<AvroLogicalTypeEnum, DateTimeParseContext> 
DATE_TIME_PARSE_CONTEXT_MAP = getParseContext();
+
+  private static Map<AvroLogicalTypeEnum, DateTimeParseContext> 
getParseContext() {
+    // The pattern is derived from ISO_LOCAL_DATE_TIME definition with the 
relaxation on the separator.
+    DateTimeFormatter localDateTimeFormatter = new DateTimeFormatterBuilder()
+        .parseCaseInsensitive()
+        .append(ISO_LOCAL_DATE)
+        .optionalStart()
+        .appendLiteral('T')
+        .optionalEnd()
+        .optionalStart()
+        .appendLiteral(' ')
+        .optionalEnd()
+        .append(ISO_LOCAL_TIME)
+        .toFormatter()
+        .withResolverStyle(ResolverStyle.STRICT)
+        .withChronology(IsoChronology.INSTANCE);
+    // Formatter for parsing timestamp.
+    // The pattern is derived from ISO_OFFSET_DATE_TIME definition with the 
relaxation on the separator.
+    // Pattern asserts the string is
+    // <optional sign><Year>-<Month>-<Day><separator><Hour>:<Minute> + 
optional <second> + optional <fractional second> + optional <zone offset>
+    // <separator> is 'T' or ' '
+    // For example, "2024-07-13T11:36:01.951Z", 
"2024-07-13T11:36:01.951+01:00",
+    // "2024-07-13T11:36:01Z", "2024-07-13T11:36:01+01:00",
+    // "2024-07-13 11:36:01.951Z", "2024-07-13 11:36:01.951+01:00".
+    // See TestMercifulJsonConverter#timestampLogicalTypeGoodCaseTest
+    // and #timestampLogicalTypeBadTest for supported and unsupported cases.
+    DateTimeParseContext dateTimestampParseContext = new DateTimeParseContext(
+        new DateTimeFormatterBuilder()
+            .parseCaseInsensitive()
+            .append(localDateTimeFormatter)
+            .optionalStart()
+            .appendOffsetId()
+            .optionalEnd()
+            .parseDefaulting(ChronoField.OFFSET_SECONDS, 0L)
+            .toFormatter()
+            .withResolverStyle(ResolverStyle.STRICT)
+            .withChronology(IsoChronology.INSTANCE),
+        null /* match everything*/);
+    // Formatter for parsing time of day. The pattern is derived from 
ISO_LOCAL_TIME definition.
+    // Pattern asserts the string is
+    // <optional sign><Hour>:<Minute> + optional <second> + optional 
<fractional second>
+    // For example, "11:36:01.951".
+    // See TestMercifulJsonConverter#timeLogicalTypeTest
+    // and #timeLogicalTypeBadCaseTest for supported and unsupported cases.
+    DateTimeParseContext dateTimeParseContext = new DateTimeParseContext(
+        ISO_LOCAL_TIME,
+        Pattern.compile("^[+-]?\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?"));
+    // Formatter for parsing local timestamp.
+    // The pattern is derived from ISO_LOCAL_DATE_TIME definition with the 
relaxation on the separator.
+    // Pattern asserts the string is
+    // <optional sign><Year>-<Month>-<Day><separator><Hour>:<Minute> + 
optional <second> + optional <fractional second>
+    // <separator> is 'T' or ' '
+    // For example, "2024-07-13T11:36:01.951", "2024-07-13 11:36:01.951".
+    // See TestMercifulJsonConverter#localTimestampLogicalTypeGoodCaseTest
+    // and #localTimestampLogicalTypeBadTest for supported and unsupported 
cases.
+    DateTimeParseContext localTimestampParseContext = new DateTimeParseContext(
+        localDateTimeFormatter,
+        Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}[T 
]\\d{2}:\\d{2}(?::\\d{2}(?:\\.\\d{1,9})?)?")
+    );
+    // Formatter for parsing local date. The pattern is derived from 
ISO_LOCAL_DATE definition.
+    // Pattern asserts the string is
+    // <optional sign><Year>-<Month>-<Day>
+    // For example, "2024-07-13".
+    // See TestMercifulJsonConverter#dateLogicalTypeTest for supported and 
unsupported cases.
+    DateTimeParseContext localDateParseContext = new DateTimeParseContext(
+        ISO_LOCAL_DATE,
+        Pattern.compile("^[+-]?\\d{4,10}-\\d{2}-\\d{2}?")
+    );
+
+    EnumMap<AvroLogicalTypeEnum, DateTimeParseContext> ctx = new 
EnumMap<>(AvroLogicalTypeEnum.class);
+    ctx.put(AvroLogicalTypeEnum.TIME_MICROS, dateTimeParseContext);
+    ctx.put(AvroLogicalTypeEnum.TIME_MILLIS, dateTimeParseContext);
+    ctx.put(AvroLogicalTypeEnum.DATE, localDateParseContext);
+    ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MICROS, 
localTimestampParseContext);
+    ctx.put(AvroLogicalTypeEnum.LOCAL_TIMESTAMP_MILLIS, 
localTimestampParseContext);
+    ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MICROS, dateTimestampParseContext);
+    ctx.put(AvroLogicalTypeEnum.TIMESTAMP_MILLIS, dateTimestampParseContext);
+    return Collections.unmodifiableMap(ctx);
+  }
+
+  // Pattern validating if it is an number in string form.
+  // Only check at most 19 digits as this is the max num of digits for 
LONG.MAX_VALUE to contain the cost of regex matching.
+  protected static final Pattern ALL_DIGITS_WITH_OPTIONAL_SIGN = 
Pattern.compile("^[-+]?\\d{1,19}$");
+
+  /**
+   * Check if the given string is a well-formed date time string.
+   * If no pattern is defined, it will always return true.
+   */
+  protected boolean isWellFormedDateTime(String value) {
+    Pattern pattern = getDateTimePattern();
+    return pattern == null || pattern.matcher(value).matches();
+  }
+
+  protected Pair<Boolean, Instant> convertToInstantTime(String input) {
+    // Parse the input timestamp
+    // The input string is assumed in the format:
+    // <optional sign><Year>-<Month>-<Day><separator><Hour>:<Minute> + 
optional <second> + optional <fractional second> + optional <zone offset>
+    // <separator> is 'T' or ' '
+    Instant time = null;
+    try {
+      ZonedDateTime dateTime = ZonedDateTime.parse(input, 
getDateTimeFormatter());
+      time = dateTime.toInstant();
+    } catch (DateTimeParseException ignore) {
+      /* ignore */
+    }
+    return Pair.of(time != null, time);
+  }
+
+  protected Pair<Boolean, LocalTime> convertToLocalTime(String input) {
+    // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_TIME is implied 
here
+    LocalTime time = null;
+    try {
+      // Try parsing as an ISO date
+      time = LocalTime.parse(input);
+    } catch (DateTimeParseException ignore) {
+      /* ignore */
+    }
+    return Pair.of(time != null, time);
+  }
+
+  protected Pair<Boolean, LocalDateTime> convertToLocalDateTime(String input) {
+    // Parse the input timestamp, DateTimeFormatter.ISO_LOCAL_DATE_TIME is 
implied here
+    LocalDateTime time = null;
+    try {
+      // Try parsing as an ISO date
+      time = LocalDateTime.parse(input, getDateTimeFormatter());
+    } catch (DateTimeParseException ignore) {
+      /* ignore */
+    }
+    return Pair.of(time != null, time);
+  }
+
+  protected Pair<Boolean, Object> convertDateTime(
+      String value,
+      Function<LocalTime, Object> localTimeFunction,
+      Function<Instant, Object> instantTimeFunction) {
+
+    if (!isWellFormedDateTime(value)) {
+      return Pair.of(false, null);
+    }
+
+    if (localTimeFunction != null) {
+      Pair<Boolean, LocalTime> result = convertToLocalTime(value);
+      if (!result.getLeft()) {
+        return Pair.of(false, null);
+      }
+      return Pair.of(true, localTimeFunction.apply(result.getRight()));
+    }
+
+    if (instantTimeFunction != null) {
+      Pair<Boolean, Instant> result = convertToInstantTime(value);
+      if (!result.getLeft()) {
+        return Pair.of(false, null);
+      }
+      return Pair.of(true, instantTimeFunction.apply(result.getRight()));
+    }
+
+    return Pair.of(false, null);  // Fallback in case of error
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
new file mode 100644
index 00000000000..75a011ed28e
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMicroLogicalTypeProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class TimeMicroLogicalTypeProcessor extends TimeLogicalTypeProcessor {
+  public TimeMicroLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.TIME_MICROS);
+  }
+
+  @Override
+  public Pair<Boolean, Object> convert(
+      Object value, String name, Schema schema) {
+    return convertCommon(
+        new Parser.LongParser() {
+          @Override
+          public Pair<Boolean, Object> handleStringValue(String value) {
+            return convertDateTime(
+                value,
+                time -> (long) time.toSecondOfDay() * 1000000 + time.getNano() 
/ 1000,  // Micros of day
+                null);
+          }
+        },
+        value, schema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
new file mode 100644
index 00000000000..f6f2ed2c2bf
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimeMilliLogicalTypeProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+/**
+ * Processor for TimeMilli logical type.
+ */
+public class TimeMilliLogicalTypeProcessor extends TimeLogicalTypeProcessor {
+  public TimeMilliLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.TIME_MILLIS);
+  }
+
+  @Override
+  public Pair<Boolean, Object> convert(
+      Object value, String name, Schema schema) {
+    return convertCommon(
+        new Parser.IntParser() {
+          @Override
+          public Pair<Boolean, Object> handleStringValue(String value) {
+            return convertDateTime(
+                value,
+                time -> time.toSecondOfDay() * 1000 + time.getNano() / 
1000000,  // Millis of day
+                null);
+          }
+        },
+        value, schema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
new file mode 100644
index 00000000000..b1cf7353382
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMicroLogicalTypeProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.Instant;
+import java.time.temporal.ChronoField;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class TimestampMicroLogicalTypeProcessor extends 
TimeLogicalTypeProcessor {
+  public TimestampMicroLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.TIMESTAMP_MICROS);
+  }
+
+  @Override
+  public Pair<Boolean, Object> convert(
+      Object value, String name, Schema schema) {
+    return convertCommon(
+        new Parser.LongParser() {
+          @Override
+          public Pair<Boolean, Object> handleStringValue(String value) {
+            return convertDateTime(
+                value,
+                null,
+                time -> Instant.EPOCH.until(time, 
ChronoField.MICRO_OF_SECOND.getBaseUnit()));  // Diff in micro
+          }
+        },
+        value, schema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
new file mode 100644
index 00000000000..5c5493f38b3
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/processors/TimestampMilliLogicalTypeProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro.processors;
+
+import org.apache.hudi.avro.AvroLogicalTypeEnum;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.time.Instant;
+import java.time.temporal.ChronoField;
+
+/**
+ * Processor for TimeMicro logical type.
+ */
+public class TimestampMilliLogicalTypeProcessor extends 
TimeLogicalTypeProcessor {
+  public TimestampMilliLogicalTypeProcessor() {
+    super(AvroLogicalTypeEnum.TIMESTAMP_MILLIS);
+  }
+
+  @Override
+  public Pair<Boolean, Object> convert(
+      Object value, String name, Schema schema) {
+    return convertCommon(
+        new Parser.LongParser() {
+          @Override
+          public Pair<Boolean, Object> handleStringValue(String value) {
+            return convertDateTime(
+                value,
+                null,
+                time -> Instant.EPOCH.until(time, 
ChronoField.MILLI_OF_SECOND.getBaseUnit()));  // Diff in millis
+          }
+        },
+        value, schema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonConversionException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonConversionException.java
new file mode 100644
index 00000000000..08c476480a0
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonConversionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieJsonConversionException extends HoodieException {
+
+  public HoodieJsonConversionException(String msg) {
+    super(msg);
+  }
+
+  public HoodieJsonConversionException(String msg, Throwable t) {
+    super(msg, t);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonToAvroConversionException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonToAvroConversionException.java
new file mode 100644
index 00000000000..b90cd53597e
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieJsonToAvroConversionException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception Class for any schema conversion issue.
+ */
+public class HoodieJsonToAvroConversionException extends 
HoodieJsonConversionException {
+
+  public HoodieJsonToAvroConversionException(String msg) {
+    super(msg);
+  }
+
+  public HoodieJsonToAvroConversionException(String msg, Throwable t) {
+    super(msg, t);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
index 49e707a1c9e..0c9be20570b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.exception.HoodieJsonToAvroConversionException;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Conversions;
@@ -97,7 +98,7 @@ public class TestMercifulJsonConverter {
     String json = MAPPER.writeValueAsString(data);
 
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(json, schema);
     });
   }
@@ -266,7 +267,7 @@ public class TestMercifulJsonConverter {
 
     Schema schema = SchemaTestUtil.getSchemaFromResourceFilePath(schemaFile);
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(json, schema);
     });
   }
@@ -336,7 +337,7 @@ public class TestMercifulJsonConverter {
     Map<String, Object> data = new HashMap<>();
     data.put("dateField", input);
     String json = MAPPER.writeValueAsString(data);
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(json, schema);
     });
   }
@@ -442,7 +443,7 @@ public class TestMercifulJsonConverter {
     data.put("timestamp", input);
     String json = MAPPER.writeValueAsString(data);
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(json, schema);
     });
   }
@@ -565,7 +566,7 @@ public class TestMercifulJsonConverter {
     data.put("timestampMillisField", validInput);
     data.put("timestampMicrosField", badInput);
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
     });
 
@@ -573,7 +574,7 @@ public class TestMercifulJsonConverter {
     data.put("timestampMillisField", badInput);
     data.put("timestampMicrosField", validInput);
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
     });
   }
@@ -657,7 +658,7 @@ public class TestMercifulJsonConverter {
     data.put("timeMicroField", validInput);
     data.put("timeMillisField", invalidInput);
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
     });
 
@@ -665,7 +666,7 @@ public class TestMercifulJsonConverter {
     data.put("timeMicroField", invalidInput);
     data.put("timeMillisField", validInput);
     // Schedule with timestamp same as that of committed instant
-    
assertThrows(MercifulJsonConverter.HoodieJsonToAvroConversionException.class, 
() -> {
+    assertThrows(HoodieJsonToAvroConversionException.class, () -> {
       CONVERTER.convert(MAPPER.writeValueAsString(data), schema);
     });
   }

Reply via email to