godfreyhe commented on code in PR #20009:
URL: https://github.com/apache/flink/pull/20009#discussion_r920036735


##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +190,181 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType 
producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), 
v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new 
HashMap<>();
+                RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, 
producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, 
producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = 
columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = 
columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                
logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics 
columnStatistics) {
+            ColumnStats.Builder builder =
+                    new 
ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - 
columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;

Review Comment:
   those unsupported types can be moved to the `default` branch, they all have 
`nullCount`
   



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +190,181 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType 
producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), 
v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new 
HashMap<>();
+                RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, 
producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, 
producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = 
columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = 
columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                
logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics 
columnStatistics) {
+            ColumnStats.Builder builder =
+                    new 
ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - 
columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (columnStatistics instanceof IntegerColumnStatistics) {
+                        builder.setMax(((IntegerColumnStatistics) 
columnStatistics).getMaximum())
+                                .setMin(((IntegerColumnStatistics) 
columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                case DOUBLE:
+                    if (columnStatistics instanceof DoubleColumnStatistics) {
+                        builder.setMax(((DoubleColumnStatistics) 
columnStatistics).getMaximum())
+                                .setMin(((DoubleColumnStatistics) 
columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                    if (columnStatistics instanceof StringColumnStatistics) {
+                        builder.setMax(((StringColumnStatistics) 
columnStatistics).getMaximum())
+                                .setMin(((StringColumnStatistics) 
columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (columnStatistics instanceof DateColumnStatistics) {
+                        Date maximum =
+                                (Date) ((DateColumnStatistics) 
columnStatistics).getMaximum();
+                        Date minimum =
+                                (Date) ((DateColumnStatistics) 
columnStatistics).getMinimum();
+                        builder.setMax(maximum).setMin(minimum);
+                        break;
+                    } else {
+                        return null;
+                    }
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    if (columnStatistics instanceof TimestampColumnStatistics) 
{
+                        builder.setMax(((TimestampColumnStatistics) 
columnStatistics).getMaximum())
+                                .setMin(
+                                        ((TimestampColumnStatistics) 
columnStatistics)
+                                                .getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DECIMAL:
+                    if (columnStatistics instanceof DecimalColumnStatistics) {
+                        builder.setMax(
+                                        ((DecimalColumnStatistics) 
columnStatistics)
+                                                .getMaximum()
+                                                .bigDecimalValue())
+                                .setMin(
+                                        ((DecimalColumnStatistics) 
columnStatistics)
+                                                .getMinimum()
+                                                .bigDecimalValue());
+                        break;
+                    } else {
+                        return null;
+                    }
+                default:
+                    return null;

Review Comment:
   should be `break` here



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +190,181 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType 
producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), 
v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new 
HashMap<>();
+                RowType producedRowType = (RowType) 
producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, 
producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, 
producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = 
columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = 
columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                
logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics 
columnStatistics) {
+            ColumnStats.Builder builder =
+                    new 
ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - 
columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;

Review Comment:
   these unsupported types can be moved to `default` branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to