[
https://issues.apache.org/jira/browse/FLINK-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-17804:
-----------------------------------
Labels: auto-deprioritized-major decimal parquet pull-request-available
spark stale-minor (was: auto-deprioritized-major decimal parquet
pull-request-available spark)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Follow the spec when decoding Parquet logical DECIMAL type
> ----------------------------------------------------------
>
> Key: FLINK-17804
> URL: https://issues.apache.org/jira/browse/FLINK-17804
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.10.1
> Reporter: Sergii Mikhtoniuk
> Priority: Minor
> Labels: auto-deprioritized-major, decimal, parquet,
> pull-request-available, spark, stale-minor
>
> When reading a Parquet file (produced by Spark 2.4.0 with default
> configuration) Flink's {{ParquetRowInputFormat}} fails with
> {{NumberFormatException}}.
> After debugging this it seems that Flink doesn't follow the Parquet spec on
> [encoding DECIMAL logical
> type|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal]
> The Parquet schema for this field is:
> {code}
> optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4));
> {code}
> If I understand the spec correctly, it says that the value should contain a
> binary representation of an unscaled decimal. Flink's
> [RowConverter|https://github.com/apache/flink/blob/release-1.10.1/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java#L202]
> however treats it as a base-10 UTF-8 string.
> What Flink essentially is doing:
> {code}
> val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
> val decimal = new java.math.BigDecimal(new String(binary,
> "UTF-8").toCharArray)
> {code}
> What I think spec suggests:
> {code}
> val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
> val unscaled = new java.math.BigInteger(binary)
> val decimal = new java.math.BigDecimal(unscaled)
> {code}
> Error stacktrace:
> {code}
> java.lang.NumberFormatException
> at java.math.BigDecimal.<init>(BigDecimal.java:497)
> at java.math.BigDecimal.<init>(BigDecimal.java:383)
> at java.math.BigDecimal.<init>(BigDecimal.java:680)
> at
> org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary(RowConverter.java:202)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
> at
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235)
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
> at
> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
> {code}
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)