[ 
https://issues.apache.org/jira/browse/HUDI-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171428#comment-17171428
 ] 

wangxianghu commented on HUDI-1150:
-----------------------------------

[~Pratyaksh] the timeUnit will only be initialized when the timestampType  is 
one of EPOCHMILLISECONDS, UNIX_TIMESTAMP and SCALAR.

 
{code:java}
switch (this.timestampType) {
  case EPOCHMILLISECONDS:
    timeUnit = MILLISECONDS;
    break;
  case UNIX_TIMESTAMP:
    timeUnit = SECONDS;
    break;
  case SCALAR:
    String timeUnitStr = config.getString(Config.INPUT_TIME_UNIT, 
TimeUnit.SECONDS.toString());
    timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase());
    break;
  default:
    timeUnit = null;
}
{code}
if user set timestampType  to DATE_STRING, the timeUnit will never be used, the 
logic is ok.

the reason the exception occured is that user specified the timestampType to 
DATA_STRING, but when the partitionVal is null, the keyGemerator will give a 
long type value, this will make keyGemerator take it as UNIX_TIMESTAMP or 
EPOCHMILLISECONDS type, and call this method 
org.apache.hudi.keygen.TimestampBasedKeyGenerator#convertLongTimeToMillis, 
finally we get the excepiton 
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit is not 
specified but scalar it supplied as time value"

 
{code:java}
//partitionVal was set to 1L when its value was null,its timestampType was 
changed;
try {
  long timeMs;
  if (partitionVal instanceof Double) {
    timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
  } else if (partitionVal instanceof Float) {
    timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
  } else if (partitionVal instanceof Long) {
    timeMs = convertLongTimeToMillis((Long) partitionVal);
  } else if (partitionVal instanceof CharSequence) {
    DateTime parsedDateTime = 
inputFormatter.parseDateTime(partitionVal.toString());
    if (this.outputDateTimeZone == null) {
      // Use the timezone that came off the date that was passed in, if it had 
one
      partitionFormatter = 
partitionFormatter.withZone(parsedDateTime.getZone());
    }

    timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
  } else {
    throw new HoodieNotSupportedException(
        "Unexpected type for partition field: " + 
partitionVal.getClass().getName());
  }
{code}
 
{code:java}
//this method wont be called if the timestampType is DATE_STRING and the input 
value is not null
private long convertLongTimeToMillis(Long partitionVal) {
  if (timeUnit == null) {
    // should not be possible
    throw new RuntimeException(Config.INPUT_TIME_UNIT + " is not specified but 
scalar it supplied as time value");
  }
  return MILLISECONDS.convert(partitionVal, timeUnit);
}

{code}
 

 

 

> Fix unable to parse input partition field :1 exception when using 
> TimestampBasedKeyGenerator 
> ---------------------------------------------------------------------------------------------
>
>                 Key: HUDI-1150
>                 URL: https://issues.apache.org/jira/browse/HUDI-1150
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: wangxianghu
>            Assignee: wangxianghu
>            Priority: Major
>             Fix For: 0.6.0
>
>
> scene to reproduce:
>  # use TimestampBasedKeyGenerator
>  # set 
> {color:#333333}hoodie.deltastreamer.keygen.timebased.timestamp.type{color} = 
> DATE_STRING
>  # partitionpath field value is null
> when partitionpath field value is null TimestampBasedKeyGenerator will set it 
> to1L, which can not be parsed correctly.
>  
> {code:java}
> //
> User class threw exception: java.util.concurrent.ExecutionException: 
> org.apache.hudi.exception.HoodieException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 1.0 (TID 4, prod-t3-data-lake-007, executor 6): 
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to parse input 
> partition field :1
>  at 
> org.apache.hudi.keygen.TimestampBasedKeyGenerator.getPartitionPath(TimestampBasedKeyGenerator.java:156)
>  at 
> org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:108)
>  at 
> org.apache.hudi.keygen.CustomKeyGenerator.getKey(CustomKeyGenerator.java:78)
>  at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$9fce03f0$1(DeltaSync.java:343)
>  at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>  at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>  at scala.collection.AbstractIterator.to(Iterator.scala:1334)
>  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
>  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>  at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
>  at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
>  at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
>  at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
>  at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  at org.apache.spark.scheduler.Task.run(Task.scala:121)
>  at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: 
> hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit is not 
> specified but scalar it supplied as time value
>  at 
> org.apache.hudi.keygen.TimestampBasedKeyGenerator.convertLongTimeToMillis(TimestampBasedKeyGenerator.java:163)
>  at 
> org.apache.hudi.keygen.TimestampBasedKeyGenerator.getPartitionPath(TimestampBasedKeyGenerator.java:138)
>  ... 29 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to