luoyuxia commented on code in PR #20549:
URL: https://github.com/apache/flink/pull/20549#discussion_r950130698


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java:
##########
@@ -576,6 +581,147 @@ public void testWriteSuccessFile() throws Exception {
         assertThat(new File(changeFileNameTablePath, 
"dt=2022-08-15/_ZM")).exists();
     }
 
+    @Test
+    public void testAutoGatherStatisticForBatchWriting() throws Exception {
+        TableEnvironment tEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        String wareHouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        // test non-partition table
+        tEnv.executeSql("create table t1(x int)");
+        tEnv.executeSql("create table t2(x int) stored as orc");
+        tEnv.executeSql("create table t3(x int) stored as parquet");
+        tEnv.executeSql("insert into t1 values (1)").await();
+        tEnv.executeSql("insert into t2 values (1)").await();
+        tEnv.executeSql("insert into t3 values (1)").await();
+        // check the statistic for these table
+        // the statistics should be empty since the auto gather statistic is 
disabled
+        for (int i = 1; i <= 3; i++) {
+            CatalogTableStatistics statistics =
+                    hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t" + i));
+            assertThat(statistics).isEqualTo(CatalogTableStatistics.UNKNOWN);
+        }
+        // now enable auto gather statistic
+        
tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE,
 true);
+        tEnv.executeSql("insert into t1 values (1)").await();
+        tEnv.executeSql("insert into t2 values (1)").await();
+        tEnv.executeSql("insert into t3 values (1)").await();
+        CatalogTableStatistics statistics =
+                hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t1"));
+        // t1 is neither stored as orc nor parquet, so only fileCount and 
totalSize is
+        // calculated
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                -1, 2, getPathSize(Paths.get(wareHouse, 
"t1")), -1));
+        statistics = hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t2"));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "t2")), 
8));
+        statistics = hiveCatalog.getTableStatistics(new ObjectPath("default", 
"t3"));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "t3")), 
66));
+
+        // test partition table
+        tEnv.executeSql("create table pt1(x int) partitioned by (y int)");
+        tEnv.executeSql("create table pt2(x int) partitioned by (y int) stored 
as orc");
+        tEnv.executeSql("create table pt3(x int) partitioned by (y int) stored 
as parquet");
+        tEnv.executeSql("insert into pt1 partition(y=1) values (1)").await();
+        tEnv.executeSql("insert into pt2 partition(y=2) values (2)").await();
+        tEnv.executeSql("insert into pt3 partition(y=3) values (3)").await();
+
+        // verify statistic
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt1"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"1")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                -1, 1, getPathSize(Paths.get(wareHouse, "pt1", 
"y=1")), -1));
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt2"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"2")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                1, 1, getPathSize(Paths.get(wareHouse, "pt2", 
"y=2")), 4));
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt3"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"3")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                1, 1, getPathSize(Paths.get(wareHouse, "pt3", 
"y=3")), 33));
+
+        // insert data into partition again
+        tEnv.executeSql("insert into pt1 partition(y=1) values (1)").await();
+        tEnv.executeSql("insert into pt2 partition(y=2) values (2)").await();
+        tEnv.executeSql("insert into pt3 partition(y=3) values (3)").await();
+
+        // verify statistic
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt1"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"1")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                -1, 2, getPathSize(Paths.get(wareHouse, "pt1", 
"y=1")), -1));
+
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt2"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"2")));
+
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "pt2", 
"y=2")), 8));
+
+        statistics =
+                hiveCatalog.getPartitionStatistics(
+                        new ObjectPath("default", "pt3"),
+                        new CatalogPartitionSpec(Collections.singletonMap("y", 
"3")));
+        assertThat(statistics)
+                .isEqualTo(
+                        new CatalogTableStatistics(
+                                2, 2, getPathSize(Paths.get(wareHouse, "pt3", 
"y=3")), 66));
+
+        // test overwrite table/partition
+        tEnv.executeSql("create table src(x int)");
+        tEnv.executeSql("insert overwrite table pt1 partition(y=1) select * 
from src").await();
+        tEnv.executeSql("insert overwrite table pt2 partition(y=2) select * 
from src").await();
+        tEnv.executeSql("insert overwrite table pt3 partition(y=3) select * 
from src").await();
+
+        for (int i = 1; i <= 3; i++) {
+            statistics =
+                    hiveCatalog.getPartitionStatistics(
+                            new ObjectPath("default", "pt" + i),
+                            new CatalogPartitionSpec(
+                                    Collections.singletonMap("y", 
String.valueOf(i))));
+            assertThat(statistics).isEqualTo(CatalogTableStatistics.UNKNOWN);

Review Comment:
   We do report zero stats to metastore. But the method  
[HiveStatsUtil#createCatalogTableStatistics](https://github.com/apache/flink/blob/6fb49b1d708137605e4cd90175be78389290cea5/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java#L337)
 will just consider `0` as `UNKNOWN` when get statistic from metastore.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java:
##########
@@ -140,12 +187,124 @@ private void alterPartition(
             partSD.setNumBuckets(sd.getNumBuckets());
             partSD.setSortCols(sd.getSortCols());
             partSD.setLocation(partitionPath.toString());
+            if (autoGatherStatistic) {
+                
currentPartition.getParameters().putAll(gatherStats(partitionPath, true));
+            }
             client.alter_partition(database, tableName, currentPartition);
         }
 
+        private Map<String, String> gatherStats(Path path, boolean 
isForAlterPartition)
+                throws Exception {
+            Map<String, String> statistic = new HashMap<>();
+            InputFormat<?, ?> inputFormat =
+                    
ReflectionUtil.newInstance(getInputFormatClz(sd.getInputFormat()), conf.conf());
+            long numRows = 0;
+            long fileSize = 0;
+            int numFiles = 0;
+            long rawDataSize = 0;
+            List<FileStatus> fileStatuses =
+                    
listDataFileRecursively(fileSystemFactory.create(path.toUri()), path);
+            for (FileStatus file : fileStatuses) {
+                InputSplit dummySplit =
+                        new FileSplit(
+                                toHadoopPath(file.getPath()),
+                                0,
+                                -1,
+                                new String[] {sd.getLocation()});
+                org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
+                        inputFormat.getRecordReader(dummySplit, conf.conf(), 
Reporter.NULL);
+                try {
+                    if (recordReader instanceof StatsProvidingRecordReader) {
+                        StatsProvidingRecordReader statsRR =
+                                (StatsProvidingRecordReader) recordReader;
+                        rawDataSize += statsRR.getStats().getRawDataSize();
+                        numRows += statsRR.getStats().getRowCount();
+                        fileSize += file.getLen();
+                        numFiles += 1;
+                    } else {
+                        // if the reader initialized according to input format 
class isn't instance

Review Comment:
   I'll refine such logic.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -134,6 +134,17 @@ public class HiveOptions {
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
 
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE =
+            key("table.exec.hive.sink.statistic-auto-gather.enable")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   Hive enable it by default, but spark disabed it by default. The reason I 
disabled it by default is for gathering statistic may be time-consuming, and 
user may find his job is slower when upgrade to Flink1.16 if they haven't 
noticed Flink will auto gather statistic since 1.16.
   But after discussion with @Tartarus0zm.  I changes my mind, I think we can 
just follow Hive's default behavior since we want to provide compatibility to 
Hive .
   Also, I'll note enable auto gather statistic may spend some time in the 
document.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to