Matrix42 created HUDI-2592:
------------------------------
Summary: NumberFormatException: Zero length BigInteger when
write.precombine.field is decimal type
Key: HUDI-2592
URL: https://issues.apache.org/jira/browse/HUDI-2592
Project: Apache Hudi
Issue Type: Bug
Components: Common Core
Reporter: Matrix42
Fix For: 0.10.0, 0.11.0
when write.precombine.field is decimal type,write decimal will be an empty byte
array, when read will throw NumberFormatException: Zero length BigInteger like
below:
{code:java}
2021-10-20 17:14:03
java.lang.NumberFormatException: Zero length BigInteger
at java.math.BigInteger.<init>(BigInteger.java:302)
at
org.apache.flink.table.data.DecimalData.fromUnscaledBytes(DecimalData.java:223)
at
org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createDecimalConverter$4dc14f00$1(AvroToRowDataConverters.java:158)
at
org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createNullableConverter$4568343a$1(AvroToRowDataConverters.java:94)
at
org.apache.flink.connectors.hudi.util.AvroToRowDataConverters.lambda$createRowConverter$68595fbd$1(AvroToRowDataConverters.java:75)
at
org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat$1.hasNext(MergeOnReadInputFormat.java:300)
at
org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat$LogFileOnlyIterator.reachedEnd(MergeOnReadInputFormat.java:362)
at
org.apache.flink.connectors.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:202)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
{code}
analyze:
HoodieAvroUtils.getNestedFieldVal will invoked to extract precombine field.
next will invoke convertValueForAvroLogicalTypes. when field is decimal
type,the bytebuffer will consumed, we should rewind.
{code:java}
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema,
Object fieldValue) {
if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
} else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
Decimal dc = (Decimal) fieldSchema.getLogicalType();
DecimalConversion decimalConversion = new DecimalConversion();
if (fieldSchema.getType() == Schema.Type.FIXED) {
return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema,
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
} else if (fieldSchema.getType() == Schema.Type.BYTES) {
//this methoad will consume the byteBuffer
return decimalConversion.fromBytes((ByteBuffer) fieldValue, fieldSchema,
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
}
}
return fieldValue;
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)