Vaibhav Sinha created HUDI-1864:
-----------------------------------

             Summary: Support for java.time.LocalDate in 
TimestampBasedAvroKeyGenerator
                 Key: HUDI-1864
                 URL: https://issues.apache.org/jira/browse/HUDI-1864
             Project: Apache Hudi
          Issue Type: Improvement
            Reporter: Vaibhav Sinha


When we read data from MySQL which has a column of type {{Date}}, Spark 
represents it as an instance of {{java.time.LocalDate}}. If I try and use this 
column for partitioning while doing a write to Hudi, I get the following 
exception

 
{code:java}
Caused by: org.apache.hudi.exception.HoodieKeyGeneratorException: Unable to 
parse input partition field :2021-04-21
        at 
org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:136)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) 
~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.Iterator.foreach(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.Iterator.foreach$(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) 
~[scala-library-2.12.10.jar:?]
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) 
~[scala-library-2.12.10.jar:?]
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.to(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) 
~[scala-library-2.12.10.jar:?]
        at 
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at org.apache.spark.scheduler.Task.run(Task.scala:131) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Unexpected 
type for partition field: java.time.LocalDate
        at 
org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:208)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:134)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) 
~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160)
 ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.Iterator.foreach(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.Iterator.foreach$(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) 
~[scala-library-2.12.10.jar:?]
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) 
~[scala-library-2.12.10.jar:?]
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.to(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) 
~[scala-library-2.12.10.jar:?]
        at 
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) 
~[scala-library-2.12.10.jar:?]
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
        at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at org.apache.spark.scheduler.Task.run(Task.scala:131) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_171]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_171]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
{code}
Currently, the only supported column types are
{code:java}
public String getPartitionPath(Object partitionVal) {
    initIfNeeded();
    long timeMs;
    if (partitionVal instanceof Double) {
      timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
    } else if (partitionVal instanceof Float) {
      timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
    } else if (partitionVal instanceof Long) {
      timeMs = convertLongTimeToMillis((Long) partitionVal);
    } else if (partitionVal instanceof CharSequence) {
      if (!inputFormatter.isPresent()) {
        throw new HoodieException("Missing inputformatter. Ensure " + 
Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is 
DATE_STRING or MIXED!");
      }
      DateTime parsedDateTime = 
inputFormatter.get().parseDateTime(partitionVal.toString());
      if (this.outputDateTimeZone == null) {
        // Use the timezone that came off the date that was passed in, if it 
had one
        partitionFormatter = 
partitionFormatter.withZone(parsedDateTime.getZone());
      }      timeMs = 
inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
    } else {
      throw new HoodieNotSupportedException(
          "Unexpected type for partition field: " + 
partitionVal.getClass().getName());
    }
    DateTime timestamp = new DateTime(timeMs, outputDateTimeZone);
    String partitionPath = timestamp.toString(partitionFormatter);
    if (encodePartitionPath) {
      try {
        partitionPath = URLEncoder.encode(partitionPath, 
StandardCharsets.UTF_8.toString());
      } catch (UnsupportedEncodingException uoe) {
        throw new HoodieException(uoe.getMessage(), uoe);
      }
    }
    return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + 
partitionPath : partitionPath;
  }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to