luoyuxia commented on code in PR #20549:
URL: https://github.com/apache/flink/pull/20549#discussion_r951099910


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java:
##########
@@ -140,12 +203,194 @@ private void alterPartition(
             partSD.setNumBuckets(sd.getNumBuckets());
             partSD.setSortCols(sd.getSortCols());
             partSD.setLocation(partitionPath.toString());
+            if (autoGatherStatistic) {
+                
currentPartition.getParameters().putAll(gatherStats(partitionPath, true));
+            }
             client.alter_partition(database, tableName, currentPartition);
         }
 
+        private Map<String, String> gatherStats(Path path, boolean 
isForAlterPartition)
+                throws Exception {
+            Map<String, String> statistic = new HashMap<>();
+            Optional<Map<String, String>> stats = gatherFullStats(path);
+            if (stats.isPresent()) {
+                return stats.get();
+            } else {
+                // now, we only gather fileSize and numFiles.
+                // but if it's for collect stats to alter partition, we can 
skip
+                // calculate totalSize and numFiles since it'll be calculated 
by Hive metastore
+                // forcibly while altering partition. so we return -1 directly 
to avoid gathering
+                // statistic in here since it's redundant and time-consuming
+                long fileSize = 0;
+                int numFiles = 0;
+                if (isForAlterPartition) {
+                    statistic.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(-1));
+                    statistic.put(StatsSetupConst.NUM_FILES, 
String.valueOf(-1));
+                } else {
+                    for (FileStatus fileStatus :
+                            
listDataFileRecursively(fileSystemFactory.create(path.toUri()), path)) {
+                        numFiles += 1;
+                        fileSize += fileStatus.getLen();
+                    }
+                    statistic.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(fileSize));
+                    statistic.put(StatsSetupConst.NUM_FILES, 
String.valueOf(numFiles));
+                }
+                return statistic;
+            }
+        }
+
         @Override
         public void close() {
             client.close();
+            if (executorService != null) {
+                executorService.shutdown();
+            }
+        }
+
+        private Optional<Map<String, String>> gatherFullStats(Path path) 
throws Exception {
+            Map<String, String> statistic = new HashMap<>();
+            InputFormat<?, ?> inputFormat =
+                    
ReflectionUtil.newInstance(getInputFormatClz(sd.getInputFormat()), conf.conf());
+            if (inputFormat instanceof OrcInputFormat
+                    || inputFormat instanceof MapredParquetInputFormat) {
+                List<Future<CatalogTableStatistics>> statsFutureList = new 
ArrayList<>();
+                for (FileStatus fileStatus :
+                        
listDataFileRecursively(fileSystemFactory.create(path.toUri()), path)) {
+                    InputSplit dummySplit =
+                            new FileSplit(
+                                    toHadoopPath(fileStatus.getPath()),
+                                    0,
+                                    -1,
+                                    new String[] {sd.getLocation()});
+                    org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
+                            inputFormat.getRecordReader(dummySplit, 
conf.conf(), Reporter.NULL);
+                    if (recordReader instanceof StatsProvidingRecordReader) {
+                        statsFutureList.add(
+                                submitStatsGatherTask(
+                                        new FileStatisticGather(
+                                                fileStatus,
+                                                (StatsProvidingRecordReader) 
recordReader)));
+                    } else {
+                        // won't fall into here theoretically if the 
inputFormat is instanceof
+                        // OrcInputFormat or MapredParquetInputFormat, but the 
Hive's implementation
+                        // may change which may cause falling into here.
+                        LOG.warn(
+                                "The inputFormat is instanceof OrcInputFormat 
or MapredParquetInputFormat,"
+                                        + " but the RecordReader from the 
inputFormat is not instance of StatsProvidingRecordReader."
+                                        + " So the statistic 
numRows/rawDataSize can't be gathered");
+                        statsFutureList.forEach(
+                                catalogTableStatisticsFuture ->
+                                        
catalogTableStatisticsFuture.cancel(true));
+                        return Optional.empty();
+                    }
+                }
+                List<CatalogTableStatistics> statsList = new ArrayList<>();
+                for (Future<CatalogTableStatistics> future : statsFutureList) {
+                    statsList.add(future.get());
+                }
+                HiveStatsUtil.updateStats(accumulate(statsList), statistic);
+                return Optional.of(statistic);
+            } else {
+                // if the input format is neither OrcInputFormat nor 
MapredParquetInputFormat,
+                // we can't gather full statistic in current implementation.
+                // so return empty.
+                return Optional.empty();
+            }
+        }
+
+        /** List data files recursively. */
+        private List<FileStatus> listDataFileRecursively(FileSystem 
fileSystem, Path f)
+                throws IOException {
+            List<FileStatus> fileStatusList = new ArrayList<>();
+            for (FileStatus fileStatus : fileSystem.listStatus(f)) {
+                if (fileStatus.isDir() && !isStagingDir(fileStatus.getPath())) 
{
+                    fileStatusList.addAll(
+                            listDataFileRecursively(fileSystem, 
fileStatus.getPath()));
+                } else {
+                    if (isDataFile(fileStatus)) {
+                        fileStatusList.add(fileStatus);
+                    }
+                }
+            }
+            return fileStatusList;
+        }
+
+        private boolean isStagingDir(Path path) {
+            // in batch mode, the name for staging dir starts with 
'.staging_', see
+            // HiveTableSink#toStagingDir
+            // in stream mode, the stage dir is the partition/table location, 
but the staging files
+            // start with '.'
+            return path.getPath().startsWith(".");
+        }
+
+        private boolean isDataFile(FileStatus fileStatus) {
+            String fileName = fileStatus.getPath().getName();
+            return !fileName.startsWith(".")
+                    && !fileName.startsWith("_")
+                    && !fileName.equals(successFileName);
+        }
+
+        private Class<? extends InputFormat<?, ?>> getInputFormatClz(String 
inputFormatClz) {
+            try {
+                return (Class<? extends InputFormat<?, ?>>)
+                        Class.forName(
+                                inputFormatClz,
+                                true,
+                                
Thread.currentThread().getContextClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new FlinkHiveException(
+                        String.format(
+                                "Unable to load the class of the input format 
%s.", inputFormatClz),
+                        e);
+            }
+        }
+
+        private Future<CatalogTableStatistics> submitStatsGatherTask(
+                Callable<CatalogTableStatistics> statsGatherTask) {
+            if (executorService == null) {
+                executorService =
+                        gatherStatsThreadNum == 1
+                                ? newDirectExecutorService()
+                                : Executors.newFixedThreadPool(3);

Review Comment:
   Good catch. A mistake in here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to