swuferhong commented on code in PR #20008:
URL: https://github.com/apache/flink/pull/20008#discussion_r920081161


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java:
##########
@@ -142,5 +182,246 @@ public BulkFormat<RowData, FileSourceSplit> 
createRuntimeDecoder(
         public ChangelogMode getChangelogMode() {
             return ChangelogMode.insertOnly();
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType 
producedDataType) {
+            try {
+                Configuration hadoopConfig = 
getParquetConfiguration(formatOptions);
+                Map<String, Statistics<?>> columnStatisticsMap = new 
HashMap<>();
+                RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+                long rowCount = 0;
+                for (Path file : files) {
+                    rowCount += updateStatistics(hadoopConfig, file, 
columnStatisticsMap);
+                }
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(columnStatisticsMap, 
producedRowType);
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                Map<String, Statistics<?>> columnStatisticsMap, RowType 
producedRowType) {
+            Map<String, ColumnStats> columnStatMap = new HashMap<>();
+            for (String column : producedRowType.getFieldNames()) {
+                Statistics<?> statistics = columnStatisticsMap.get(column);
+                if (statistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                
producedRowType.getTypeAt(producedRowType.getFieldIndex(column)),
+                                statistics);
+                columnStatMap.put(column, columnStats);
+            }
+            return columnStatMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                LogicalType logicalType, Statistics<?> statistics) {
+            ColumnStats.Builder builder =
+                    new 
ColumnStats.Builder().setNullCount(statistics.getNumNulls());
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case BINARY:
+                case VARBINARY:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (statistics instanceof IntStatistics) {
+                        builder.setMin(((IntStatistics) statistics).getMin())
+                                .setMax(((IntStatistics) statistics).getMax());
+                        break;
+                    } else if (statistics instanceof LongStatistics) {
+                        builder.setMin(((LongStatistics) statistics).getMin())
+                                .setMax(((LongStatistics) 
statistics).getMax());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DOUBLE:
+                    if (statistics instanceof DoubleStatistics) {
+                        builder.setMin(((DoubleStatistics) 
statistics).getMin())
+                                .setMax(((DoubleStatistics) 
statistics).getMax());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                    if (statistics instanceof FloatStatistics) {
+                        builder.setMin(((FloatStatistics) statistics).getMin())
+                                .setMax(((FloatStatistics) 
statistics).getMax());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (statistics instanceof IntStatistics) {
+                        Date min =
+                                Date.valueOf(
+                                        DateTimeUtils.formatDate(
+                                                ((IntStatistics) 
statistics).getMin()));
+                        Date max =
+                                Date.valueOf(
+                                        DateTimeUtils.formatDate(
+                                                ((IntStatistics) 
statistics).getMax()));
+                        builder.setMin(min).setMax(max);
+                        break;
+                    } else {
+                        return null;
+                    }
+                case TIME_WITHOUT_TIME_ZONE:
+                    if (statistics instanceof IntStatistics) {
+                        Time min =
+                                Time.valueOf(
+                                        DateTimeUtils.toLocalTime(
+                                                ((IntStatistics) 
statistics).getMin()));
+                        Time max =
+                                Time.valueOf(
+                                        DateTimeUtils.toLocalTime(
+                                                ((IntStatistics) 
statistics).getMax()));
+                        builder.setMin(min).setMax(max);
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                    if (statistics instanceof BinaryStatistics) {
+                        Binary min = ((BinaryStatistics) 
statistics).genericGetMin();
+                        Binary max = ((BinaryStatistics) 
statistics).genericGetMax();
+                        if (min != null) {
+                            builder.setMin(min.toStringUsingUTF8());
+                        } else {
+                            builder.setMin(null);
+                        }
+                        if (max != null) {
+                            builder.setMax(max.toStringUsingUTF8());
+                        } else {
+                            builder.setMax(null);
+                        }
+                        break;
+                    } else {
+                        return null;
+                    }
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    if (statistics instanceof LongStatistics) {
+                        builder.setMin(new Timestamp(((LongStatistics) 
statistics).getMin()))
+                                .setMax(new Timestamp(((LongStatistics) 
statistics).getMax()));
+                        break;
+                    } else if (statistics instanceof BinaryStatistics) {
+                        Binary min = ((BinaryStatistics) 
statistics).genericGetMin();
+                        Binary max = ((BinaryStatistics) 
statistics).genericGetMax();
+                        if (min != null) {
+                            builder.setMin(binaryToTimestamp(min, 
formatOptions.get(UTC_TIMEZONE)));
+                        } else {
+                            builder.setMin(null);
+                        }
+                        if (max != null) {
+                            builder.setMax(binaryToTimestamp(max, 
formatOptions.get(UTC_TIMEZONE)));
+                        } else {
+                            builder.setMax(null);
+                        }
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DECIMAL:
+                    if (statistics instanceof IntStatistics) {
+                        builder.setMin(BigDecimal.valueOf(((IntStatistics) 
statistics).getMin()))
+                                .setMax(BigDecimal.valueOf(((IntStatistics) 
statistics).getMax()));
+                        break;
+                    } else if (statistics instanceof LongStatistics) {
+                        builder.setMin(BigDecimal.valueOf(((LongStatistics) 
statistics).getMin()))
+                                .setMax(BigDecimal.valueOf(((LongStatistics) 
statistics).getMax()));
+                        break;
+                    } else if (statistics instanceof BinaryStatistics) {
+                        Binary min = ((BinaryStatistics) 
statistics).genericGetMin();
+                        Binary max = ((BinaryStatistics) 
statistics).genericGetMax();
+                        if (min != null) {
+                            builder.setMin(
+                                    binaryToDecimal(min, ((DecimalType) 
logicalType).getScale()));
+                        } else {
+                            builder.setMin(null);
+                        }
+                        if (max != null) {
+                            builder.setMax(
+                                    binaryToDecimal(max, ((DecimalType) 
logicalType).getScale()));
+                        } else {
+                            builder.setMax(null);
+                        }
+                        break;
+                    } else {
+                        return null;
+                    }

Review Comment:
   > only one `break` is needed
   
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to