[ 
https://issues.apache.org/jira/browse/FLINK-17804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201300#comment-17201300
 ] 

Dawid Wysakowicz commented on FLINK-17804:
------------------------------------------

Cross posting from github:

{quote}
Thank you for the work and sorry for not replying earlier.

The problem is that previous to the effort of FLIP-37 we did not have a support 
for precision and scale. Therefore the BIG_DEC_TYPE_INFO in the Blink planner 
assumes the maximum prescision 38,18 for that type. The BigDecimalTypeInfo 
class is an internal class used in the Blink planner as a temporary solution 
until the planner works entirely with the new types structure. Therefore it is 
not the best solution to use it at the edges of the system.

The best solution would be to update/introduce a new format that would work 
with the new format interfaces introduces in FLIP-95.
Those interfaces do not work any longer with TypeInformations but with 
DataTypes which are closer to SQL types and fully support precision and scale.

If I am not mistaken if you use an updated filesystem connector as described 
here, you will get a correct behaviour, as it does not use the 
ParquetRowInputFormat, but uses 
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat#ParquetInputFormat
If you use a connector as folows:

CREATE TABLE fs_table (
...
) PARTITIONED BY (dt, hour) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet'
);

That said I am not sure if this is a valid fix, as it might break existing 
jobs. Let me know what you think, and if using the connector I linked works for 
you.
{quote}

> Follow the spec when decoding Parquet logical DECIMAL type
> ----------------------------------------------------------
>
>                 Key: FLINK-17804
>                 URL: https://issues.apache.org/jira/browse/FLINK-17804
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.10.1
>            Reporter: Sergii Mikhtoniuk
>            Priority: Major
>              Labels: decimal, parquet, pull-request-available, spark
>
> When reading a Parquet file (produced by Spark 2.4.0 with default 
> configuration) Flink's {{ParquetRowInputFormat}} fails with 
> {{NumberFormatException}}.
> After debugging this it seems that Flink doesn't follow the Parquet spec on 
> [encoding DECIMAL logical 
> type|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal]
> The Parquet schema for this field is:
> {code}
> optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4));
> {code}
> If I understand the spec correctly, it says that the value should contain a 
> binary representation of an unscaled decimal. Flink's 
> [RowConverter|https://github.com/apache/flink/blob/release-1.10.1/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java#L202]
>  however treats it as a base-10 UTF-8 string.
> What Flink essentially is doing:
> {code}
> val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
> val decimal = new java.math.BigDecimal(new String(binary, 
> "UTF-8").toCharArray)
> {code}
> What I think spec suggests:
> {code}
> val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
> val unscaled = new java.math.BigInteger(binary)
> val decimal = new java.math.BigDecimal(unscaled)
> {code}
> Error stacktrace:
>  {code}
> java.lang.NumberFormatException
>       at java.math.BigDecimal.<init>(BigDecimal.java:497)
>       at java.math.BigDecimal.<init>(BigDecimal.java:383)
>       at java.math.BigDecimal.<init>(BigDecimal.java:680)
>       at 
> org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary(RowConverter.java:202)
>       at 
> org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
>       at 
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
>       at 
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
>       at 
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235)
>       at 
> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
>       at 
> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
>       at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
> {code}
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to