[
https://issues.apache.org/jira/browse/FLINK-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201300#comment-17201300
]
Dawid Wysakowicz commented on FLINK-17804:
------------------------------------------
Cross posting from github:
{quote}
Thank you for the work and sorry for not replying earlier.
The problem is that previous to the effort of FLIP-37 we did not have a support
for precision and scale. Therefore the BIG_DEC_TYPE_INFO in the Blink planner
assumes the maximum prescision 38,18 for that type. The BigDecimalTypeInfo
class is an internal class used in the Blink planner as a temporary solution
until the planner works entirely with the new types structure. Therefore it is
not the best solution to use it at the edges of the system.
The best solution would be to update/introduce a new format that would work
with the new format interfaces introduces in FLIP-95.
Those interfaces do not work any longer with TypeInformations but with
DataTypes which are closer to SQL types and fully support precision and scale.
If I am not mistaken if you use an updated filesystem connector as described
here, you will get a correct behaviour, as it does not use the
ParquetRowInputFormat, but uses
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat#ParquetInputFormat
If you use a connector as folows:
CREATE TABLE fs_table (
...
) PARTITIONED BY (dt, hour) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet'
);
That said I am not sure if this is a valid fix, as it might break existing
jobs. Let me know what you think, and if using the connector I linked works for
you.
{quote}
> Follow the spec when decoding Parquet logical DECIMAL type
> ----------------------------------------------------------
>
> Key: FLINK-17804
> URL: https://issues.apache.org/jira/browse/FLINK-17804
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.10.1
> Reporter: Sergii Mikhtoniuk
> Priority: Major
> Labels: decimal, parquet, pull-request-available, spark
>
> When reading a Parquet file (produced by Spark 2.4.0 with default
> configuration) Flink's {{ParquetRowInputFormat}} fails with
> {{NumberFormatException}}.
> After debugging this it seems that Flink doesn't follow the Parquet spec on
> [encoding DECIMAL logical
> type|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal]
> The Parquet schema for this field is:
> {code}
> optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4));
> {code}
> If I understand the spec correctly, it says that the value should contain a
> binary representation of an unscaled decimal. Flink's
> [RowConverter|https://github.com/apache/flink/blob/release-1.10.1/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java#L202]
> however treats it as a base-10 UTF-8 string.
> What Flink essentially is doing:
> {code}
> val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
> val decimal = new java.math.BigDecimal(new String(binary,
> "UTF-8").toCharArray)
> {code}
> What I think spec suggests:
> {code}
> val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
> val unscaled = new java.math.BigInteger(binary)
> val decimal = new java.math.BigDecimal(unscaled)
> {code}
> Error stacktrace:
> {code}
> java.lang.NumberFormatException
> at java.math.BigDecimal.<init>(BigDecimal.java:497)
> at java.math.BigDecimal.<init>(BigDecimal.java:383)
> at java.math.BigDecimal.<init>(BigDecimal.java:680)
> at
> org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary(RowConverter.java:202)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
> at
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235)
> at
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
> at
> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
> {code}
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)