Tartarus0zm commented on code in PR #20549:
URL: https://github.com/apache/flink/pull/20549#discussion_r951089906
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java:
##########
@@ -140,12 +203,194 @@ private void alterPartition(
partSD.setNumBuckets(sd.getNumBuckets());
partSD.setSortCols(sd.getSortCols());
partSD.setLocation(partitionPath.toString());
+ if (autoGatherStatistic) {
+
currentPartition.getParameters().putAll(gatherStats(partitionPath, true));
+ }
client.alter_partition(database, tableName, currentPartition);
}
+ private Map<String, String> gatherStats(Path path, boolean
isForAlterPartition)
+ throws Exception {
+ Map<String, String> statistic = new HashMap<>();
+ Optional<Map<String, String>> stats = gatherFullStats(path);
+ if (stats.isPresent()) {
+ return stats.get();
+ } else {
+ // now, we only gather fileSize and numFiles.
+ // but if it's for collect stats to alter partition, we can
skip
+ // calculate totalSize and numFiles since it'll be calculated
by Hive metastore
+ // forcibly while altering partition. so we return -1 directly
to avoid gathering
+ // statistic in here since it's redundant and time-consuming
+ long fileSize = 0;
+ int numFiles = 0;
+ if (isForAlterPartition) {
+ statistic.put(StatsSetupConst.TOTAL_SIZE,
String.valueOf(-1));
+ statistic.put(StatsSetupConst.NUM_FILES,
String.valueOf(-1));
+ } else {
+ for (FileStatus fileStatus :
+
listDataFileRecursively(fileSystemFactory.create(path.toUri()), path)) {
+ numFiles += 1;
+ fileSize += fileStatus.getLen();
+ }
+ statistic.put(StatsSetupConst.TOTAL_SIZE,
String.valueOf(fileSize));
+ statistic.put(StatsSetupConst.NUM_FILES,
String.valueOf(numFiles));
+ }
+ return statistic;
+ }
+ }
+
@Override
public void close() {
client.close();
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+
+ private Optional<Map<String, String>> gatherFullStats(Path path)
throws Exception {
+ Map<String, String> statistic = new HashMap<>();
+ InputFormat<?, ?> inputFormat =
+
ReflectionUtil.newInstance(getInputFormatClz(sd.getInputFormat()), conf.conf());
+ if (inputFormat instanceof OrcInputFormat
+ || inputFormat instanceof MapredParquetInputFormat) {
+ List<Future<CatalogTableStatistics>> statsFutureList = new
ArrayList<>();
+ for (FileStatus fileStatus :
+
listDataFileRecursively(fileSystemFactory.create(path.toUri()), path)) {
+ InputSplit dummySplit =
+ new FileSplit(
+ toHadoopPath(fileStatus.getPath()),
+ 0,
+ -1,
+ new String[] {sd.getLocation()});
+ org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
+ inputFormat.getRecordReader(dummySplit,
conf.conf(), Reporter.NULL);
+ if (recordReader instanceof StatsProvidingRecordReader) {
+ statsFutureList.add(
+ submitStatsGatherTask(
+ new FileStatisticGather(
+ fileStatus,
+ (StatsProvidingRecordReader)
recordReader)));
+ } else {
+ // won't fall into here theoretically if the
inputFormat is instanceof
+ // OrcInputFormat or MapredParquetInputFormat, but the
Hive's implementation
+ // may change which may cause falling into here.
+ LOG.warn(
+ "The inputFormat is instanceof OrcInputFormat
or MapredParquetInputFormat,"
+ + " but the RecordReader from the
inputFormat is not instance of StatsProvidingRecordReader."
+ + " So the statistic
numRows/rawDataSize can't be gathered");
+ statsFutureList.forEach(
+ catalogTableStatisticsFuture ->
+
catalogTableStatisticsFuture.cancel(true));
+ return Optional.empty();
+ }
+ }
+ List<CatalogTableStatistics> statsList = new ArrayList<>();
+ for (Future<CatalogTableStatistics> future : statsFutureList) {
+ statsList.add(future.get());
+ }
+ HiveStatsUtil.updateStats(accumulate(statsList), statistic);
+ return Optional.of(statistic);
+ } else {
+ // if the input format is neither OrcInputFormat nor
MapredParquetInputFormat,
+ // we can't gather full statistic in current implementation.
+ // so return empty.
+ return Optional.empty();
+ }
+ }
+
+ /** List data files recursively. */
+ private List<FileStatus> listDataFileRecursively(FileSystem
fileSystem, Path f)
+ throws IOException {
+ List<FileStatus> fileStatusList = new ArrayList<>();
+ for (FileStatus fileStatus : fileSystem.listStatus(f)) {
+ if (fileStatus.isDir() && !isStagingDir(fileStatus.getPath()))
{
+ fileStatusList.addAll(
+ listDataFileRecursively(fileSystem,
fileStatus.getPath()));
+ } else {
+ if (isDataFile(fileStatus)) {
+ fileStatusList.add(fileStatus);
+ }
+ }
+ }
+ return fileStatusList;
+ }
+
+ private boolean isStagingDir(Path path) {
+ // in batch mode, the name for staging dir starts with
'.staging_', see
+ // HiveTableSink#toStagingDir
+ // in stream mode, the stage dir is the partition/table location,
but the staging files
+ // start with '.'
+ return path.getPath().startsWith(".");
+ }
+
+ private boolean isDataFile(FileStatus fileStatus) {
+ String fileName = fileStatus.getPath().getName();
+ return !fileName.startsWith(".")
+ && !fileName.startsWith("_")
+ && !fileName.equals(successFileName);
+ }
+
+ private Class<? extends InputFormat<?, ?>> getInputFormatClz(String
inputFormatClz) {
+ try {
+ return (Class<? extends InputFormat<?, ?>>)
+ Class.forName(
+ inputFormatClz,
+ true,
+
Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new FlinkHiveException(
+ String.format(
+ "Unable to load the class of the input format
%s.", inputFormatClz),
+ e);
+ }
+ }
+
+ private Future<CatalogTableStatistics> submitStatsGatherTask(
+ Callable<CatalogTableStatistics> statsGatherTask) {
+ if (executorService == null) {
+ executorService =
+ gatherStatsThreadNum == 1
+ ? newDirectExecutorService()
+ : Executors.newFixedThreadPool(3);
Review Comment:
I think the size of the fixed thread pool should not be a constant, it
should use the user-configured size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]