[
https://issues.apache.org/jira/browse/HUDI-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171428#comment-17171428
]
wangxianghu commented on HUDI-1150:
-----------------------------------
[~Pratyaksh] the timeUnit will only be initialized when the timestampType is
one of EPOCHMILLISECONDS, UNIX_TIMESTAMP and SCALAR.
{code:java}
switch (this.timestampType) {
case EPOCHMILLISECONDS:
timeUnit = MILLISECONDS;
break;
case UNIX_TIMESTAMP:
timeUnit = SECONDS;
break;
case SCALAR:
String timeUnitStr = config.getString(Config.INPUT_TIME_UNIT,
TimeUnit.SECONDS.toString());
timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase());
break;
default:
timeUnit = null;
}
{code}
if user set timestampType to DATE_STRING, the timeUnit will never be used, the
logic is ok.
the reason the exception occured is that user specified the timestampType to
DATA_STRING, but when the partitionVal is null, the keyGemerator will give a
long type value, this will make keyGemerator take it as UNIX_TIMESTAMP or
EPOCHMILLISECONDS type, and call this method
org.apache.hudi.keygen.TimestampBasedKeyGenerator#convertLongTimeToMillis,
finally we get the excepiton
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit is not
specified but scalar it supplied as time value"
{code:java}
//partitionVal was set to 1L when its value was null,its timestampType was
changed;
try {
long timeMs;
if (partitionVal instanceof Double) {
timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
} else if (partitionVal instanceof Float) {
timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
} else if (partitionVal instanceof Long) {
timeMs = convertLongTimeToMillis((Long) partitionVal);
} else if (partitionVal instanceof CharSequence) {
DateTime parsedDateTime =
inputFormatter.parseDateTime(partitionVal.toString());
if (this.outputDateTimeZone == null) {
// Use the timezone that came off the date that was passed in, if it had
one
partitionFormatter =
partitionFormatter.withZone(parsedDateTime.getZone());
}
timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
} else {
throw new HoodieNotSupportedException(
"Unexpected type for partition field: " +
partitionVal.getClass().getName());
}
{code}
{code:java}
//this method wont be called if the timestampType is DATE_STRING and the input
value is not null
private long convertLongTimeToMillis(Long partitionVal) {
if (timeUnit == null) {
// should not be possible
throw new RuntimeException(Config.INPUT_TIME_UNIT + " is not specified but
scalar it supplied as time value");
}
return MILLISECONDS.convert(partitionVal, timeUnit);
}
{code}
> Fix unable to parse input partition field :1 exception when using
> TimestampBasedKeyGenerator
> ---------------------------------------------------------------------------------------------
>
> Key: HUDI-1150
> URL: https://issues.apache.org/jira/browse/HUDI-1150
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: wangxianghu
> Assignee: wangxianghu
> Priority: Major
> Fix For: 0.6.0
>
>
> scene to reproduce:
> # use TimestampBasedKeyGenerator
> # set
> {color:#333333}hoodie.deltastreamer.keygen.timebased.timestamp.type{color} =
> DATE_STRING
> # partitionpath field value is null
> when partitionpath field value is null TimestampBasedKeyGenerator will set it
> to1L, which can not be parsed correctly.
>
> {code:java}
> //
> User class threw exception: java.util.concurrent.ExecutionException:
> org.apache.hudi.exception.HoodieException: Job aborted due to stage failure:
> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 1.0 (TID 4, prod-t3-data-lake-007, executor 6):
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to parse input
> partition field :1
> at
> org.apache.hudi.keygen.TimestampBasedKeyGenerator.getPartitionPath(TimestampBasedKeyGenerator.java:156)
> at
> org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:108)
> at
> org.apache.hudi.keygen.CustomKeyGenerator.getKey(CustomKeyGenerator.java:78)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$9fce03f0$1(DeltaSync.java:343)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1334)
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
> at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
> at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException:
> hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit is not
> specified but scalar it supplied as time value
> at
> org.apache.hudi.keygen.TimestampBasedKeyGenerator.convertLongTimeToMillis(TimestampBasedKeyGenerator.java:163)
> at
> org.apache.hudi.keygen.TimestampBasedKeyGenerator.getPartitionPath(TimestampBasedKeyGenerator.java:138)
> ... 29 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)