[
https://issues.apache.org/jira/browse/HUDI-1864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo updated HUDI-1864:
----------------------------
Fix Version/s: 0.14.0
(was: 0.13.0)
> Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator
> -----------------------------------------------------------------
>
> Key: HUDI-1864
> URL: https://issues.apache.org/jira/browse/HUDI-1864
> Project: Apache Hudi
> Issue Type: Improvement
> Reporter: Vaibhav Sinha
> Assignee: sivabalan narayanan
> Priority: Major
> Labels: pull-request-available, query-eng, sev:high
> Fix For: 0.14.0
>
>
> When we read data from MySQL which has a column of type {{Date}}, Spark
> represents it as an instance of {{java.time.LocalDate}}. If I try and use
> this column for partitioning while doing a write to Hudi, I get the following
> exception
>
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieKeyGeneratorException: Unable to
> parse input partition field :2021-04-21
> at
> org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:136)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
> ~[scala-library-2.12.10.jar:?]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
> ~[scala-library-2.12.10.jar:?]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.to(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
> ~[scala-library-2.12.10.jar:?]
> at
> scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_171]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_171]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
> Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Unexpected
> type for partition field: java.time.LocalDate
> at
> org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:208)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:134)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at
> org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160)
> ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0]
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
> ~[scala-library-2.12.10.jar:?]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
> ~[scala-library-2.12.10.jar:?]
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.to(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
> ~[scala-library-2.12.10.jar:?]
> at
> scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
> ~[scala-library-2.12.10.jar:?]
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
> ~[scala-library-2.12.10.jar:?]
> at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_171]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_171]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171]
> {code}
> Currently, the only supported column types are
> {code:java}
> public String getPartitionPath(Object partitionVal) {
> initIfNeeded();
> long timeMs;
> if (partitionVal instanceof Double) {
> timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
> } else if (partitionVal instanceof Float) {
> timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
> } else if (partitionVal instanceof Long) {
> timeMs = convertLongTimeToMillis((Long) partitionVal);
> } else if (partitionVal instanceof CharSequence) {
> if (!inputFormatter.isPresent()) {
> throw new HoodieException("Missing inputformatter. Ensure " +
> Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType
> is DATE_STRING or MIXED!");
> }
> DateTime parsedDateTime =
> inputFormatter.get().parseDateTime(partitionVal.toString());
> if (this.outputDateTimeZone == null) {
> // Use the timezone that came off the date that was passed in, if it
> had one
> partitionFormatter =
> partitionFormatter.withZone(parsedDateTime.getZone());
> } timeMs =
> inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
> } else {
> throw new HoodieNotSupportedException(
> "Unexpected type for partition field: " +
> partitionVal.getClass().getName());
> }
> DateTime timestamp = new DateTime(timeMs, outputDateTimeZone);
> String partitionPath = timestamp.toString(partitionFormatter);
> if (encodePartitionPath) {
> try {
> partitionPath = URLEncoder.encode(partitionPath,
> StandardCharsets.UTF_8.toString());
> } catch (UnsupportedEncodingException uoe) {
> throw new HoodieException(uoe.getMessage(), uoe);
> }
> }
> return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" +
> partitionPath : partitionPath;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)