dawidwys commented on a change in pull request #12768:
URL: https://github.com/apache/flink/pull/12768#discussion_r470560128



##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
##########
@@ -102,7 +104,8 @@ public static MessageType toParquetType(TypeInformation<?> 
typeInformation, bool
                                        if (originalType != null) {
                                                switch (originalType) {
                                                        case DECIMAL:
-                                                               typeInfo = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+                                                               DecimalMetadata 
meta = primitiveType.getDecimalMetadata();
+                                                               typeInfo = 
BigDecimalTypeInfo.of(meta.getPrecision(), meta.getScale());

Review comment:
       Hey @sergiimk,
   Thank you for the work and sorry for not replying earlier.
   
   The problem is that previous to the effort of 
[FLIP-37](https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System)
 we did not have a support for precision and scale. Therefore the 
`BIG_DEC_TYPE_INFO` in the Blink planner assumes the maximum prescision `38,18` 
for that type. The `BigDecimalTypeInfo` class is an internal class used in the 
Blink planner as a temporary solution until the planner works entirely with the 
new types structure. Therefore it is not the best solution to use it at the 
edges of the system.
   
   The best solution would be to update/introduce a new format that would work 
with the new format interfaces introduces in 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
   Those interfaces do not work any longer with `TypeInformations` but with 
`DataTypes` which are closer to SQL types and fully support precision and scale.
   
   If I am not mistaken if you use an updated `filesystem` connector as 
described 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#filesystem-sql-connector),
 you will get a correct behaviour, as it does not use the 
`ParquetRowInputFormat`, but uses 
`org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat#ParquetInputFormat`
   If you use a connector as folows:
   ```
   CREATE TABLE fs_table (
   ...
   ) PARTITIONED BY (dt, hour) WITH (
     'connector'='filesystem',
     'path'='...',
     'format'='parquet'
   );
   ```
   
   That said I am not sure if this is a valid fix, as it might break existing 
jobs. Let me know what you think, and if using the connector I linked works for 
you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to