swuferhong commented on code in PR #20008:
URL: https://github.com/apache/flink/pull/20008#discussion_r920072511


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java:
##########
@@ -142,5 +182,246 @@ public BulkFormat<RowData, FileSourceSplit> 
createRuntimeDecoder(
         public ChangelogMode getChangelogMode() {
             return ChangelogMode.insertOnly();
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType 
producedDataType) {
+            try {
+                Configuration hadoopConfig = 
getParquetConfiguration(formatOptions);
+                Map<String, Statistics<?>> columnStatisticsMap = new 
HashMap<>();
+                RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+                long rowCount = 0;
+                for (Path file : files) {
+                    rowCount += updateStatistics(hadoopConfig, file, 
columnStatisticsMap);
+                }
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(columnStatisticsMap, 
producedRowType);
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                Map<String, Statistics<?>> columnStatisticsMap, RowType 
producedRowType) {
+            Map<String, ColumnStats> columnStatMap = new HashMap<>();
+            for (String column : producedRowType.getFieldNames()) {
+                Statistics<?> statistics = columnStatisticsMap.get(column);
+                if (statistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                
producedRowType.getTypeAt(producedRowType.getFieldIndex(column)),
+                                statistics);
+                columnStatMap.put(column, columnStats);
+            }
+            return columnStatMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                LogicalType logicalType, Statistics<?> statistics) {
+            ColumnStats.Builder builder =
+                    new 
ColumnStats.Builder().setNullCount(statistics.getNumNulls());
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case BINARY:
+                case VARBINARY:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;

Review Comment:
   > these unsupported types can be moved the `default` branch, and they have 
`nullCount` only.
   
   But for parquet, these types are all support, the confuse point is that 
these complex types, like ROW, their returned stats from parquet have wrong 
null count. Needs to move to `default` branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to