luoyuxia commented on code in PR #20084:
URL: https://github.com/apache/flink/pull/20084#discussion_r908295003


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +279,93 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(HiveOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, 
hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                if (defaultBulkFormat instanceof 
FileBasedStatisticsReportableInputFormat) {
+                    return ((FileBasedStatisticsReportableInputFormat) 
defaultBulkFormat)
+                            .reportStatistics(
+                                    inputSplits.stream()
+                                            .map(FileSourceSplit::path)
+                                            .collect(Collectors.toList()),
+                                    catalogTable.getSchema().toRowDataType());
+                } else {
+                    return getMapRedInputFormatStatistics(
+                            inputSplits, 
catalogTable.getSchema().toRowDataType());
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
+    private TableStats getMapRedInputFormatStatistics(
+            List<HiveSourceSplit> inputSplits, DataType producedDataType) {
+        if (inputSplits.isEmpty()) {
+            return TableStats.UNKNOWN;
+        }
+        // TODO now we assume that one hive external table has only one 
storage file format
+        HiveTablePartition hiveTablePartition = 
inputSplits.get(0).getHiveTablePartition();
+        StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
+        List<Path> files =
+                
inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+        InputFormat mapredInputFormat;
+        try {
+            mapredInputFormat =
+                    (InputFormat)
+                            Class.forName(
+                                            sd.getInputFormat(),
+                                            true,
+                                            
Thread.currentThread().getContextClassLoader())
+                                    .newInstance();
+            if (mapredInputFormat instanceof TextInputFormat) {
+                // only support csv format.
+                return CsvFormatStatisticsReportUtil.getTableStatistics(files);
+            } else if (mapredInputFormat instanceof OrcInputFormat) {
+                return OrcFormatStatisticsReportUtil.getTableStatistics(
+                        files, producedDataType, jobConf);
+            } else if (mapredInputFormat instanceof MapredParquetInputFormat) {

Review Comment:
   What if `DeprecatedParquetInputFormat`?
   I think it'll be better to check by 
`sd.getSerdeInfo.getSerializationLib`just like  
`HiveInputFormat#useParquetVectorizedRead`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to