sergiimk commented on a change in pull request #12768:
URL: https://github.com/apache/flink/pull/12768#discussion_r446370561



##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
##########
@@ -102,7 +104,8 @@ public static MessageType toParquetType(TypeInformation<?> 
typeInformation, bool
                                        if (originalType != null) {
                                                switch (originalType) {
                                                        case DECIMAL:
-                                                               typeInfo = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+                                                               DecimalMetadata 
meta = primitiveType.getDecimalMetadata();
+                                                               typeInfo = 
BigDecimalTypeInfo.of(meta.getPrecision(), meta.getScale());

Review comment:
       When original implementation returns `BasicTypeInfo.BIG_DEC_TYPE_INFO` 
it basically discards the precision/scale information from the Parquet 
metadata. When you construct a table from such row it will have a schema with 
`LEGACY('DECIMAL', 'DECIMAL')` type in it.
   
   My understanding is that for Flink SQL this type is equivalent to maximum 
precision `DECIMAL(38, 18)`. This is problematic when you do arithmetic 
operations on decimals such as multiplication, as it will result in derived 
type that is very prone to overflowing. So in my SQL, even though my Parquet 
files have `DECIMAL(18, 4)` in them I had to do lots of casting:
   ```
   SELECT 
     CAST(
       CAST(price as DECIMAL(18, 4)) * CAST(amount as DECIMAL(18, 4)
       as DECIMAL(18, 4)
     ) as value
   FROM ...
   ```
   ...basically re-casting the input values back into the precision/scale that 
they already are in Parquet to avoid overflowing and getting a silent `null` 
result.
   
   Switching this to `BigDecimalTypeInfo.of(...)` preserves the original 
precision/scale and allows me to simplify my query to:
   ```
   SELECT ..., CAST(price * amount as DECIMAL(18, 4)) as value FROM ...
   ```
   
   In my tests this works great except for **one problem** - when I inspect the 
schema of a table created from `ParquetRowInputFormat` stream it shows data 
type as:
   ```
   LEGACY('RAW', 'ANY<java.math.BigDecimal, 
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkJpZ0RlY2ltYWxUeXBlSW5mbwAAAAAAAAABAgACSQAJcHJlY2lzaW9uSQAFc2NhbGV4cgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAA9jb21wYXJhdG9yQ2xhc3NxAH4AAlsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uVHlwZUluZm9ybWF0aW9ulI3ISLqzeusCAAB4cHZyABRqYXZhLm1hdGguQmlnRGVjaW1hbFTHFVf5gShPAwACSQAFc2NhbGVMAAZpbnRWYWx0ABZMamF2YS9tYXRoL0JpZ0ludGVnZXI7eHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwdnIAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CaWdEZWNDb21wYXJhdG9yAAAAAAAAAAECAAB4cgA-b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkJhc2ljVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAloAE2FzY2VuZGluZ0NvbXBhcmlzb25bAAtjb21wYXJhdG9yc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGV1dGlscy9UeXBlQ29tcGFyYXRvcjt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlQ29tcGFyYXRvcgAAAAAAAAABAgAAeHB1cgASW0xqYXZhLmxhbmcuQ2xhc3M7qxbXrsvNWpkCAAB4cAAAAABzcgA7b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkJpZ0RlY1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cAAAABIAAAAE>')
   ```
   Doing an identity operation like `SELECT * FROM input` normalizes the schema 
back to expected `DECIMAL(18, 4)` somehow.
   
   I don't quite understand what's so different about `BIG_DEC_TYPE_INFO` and 
`BigDecimalTypeInfo.of(...)` as they both instantiate a class that implements 
`BasicTypeInfo<BigDecimal>` ... I can only assume that in `Table` 
implementation somewhere there's an equality check for `BIG_DEC_TYPE_INFO` 
instead of `instaceof`...
   
   I am very confused by Flink's type system(s), so would appreciate some 
pointers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to