godfreyhe commented on code in PR #20422:
URL: https://github.com/apache/flink/pull/20422#discussion_r939681964


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java:
##########
@@ -720,6 +720,25 @@ public CatalogTableStatistics getPartitionStatistics(
         return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
     }
 
+    /**
+     * For in memory catalog, just iteratively call {@link 
#getPartitionStatistics(ObjectPath,
+     * CatalogPartitionSpec)} for each partition.
+     */
+    @Override
+    public List<CatalogTableStatistics> bulkGetPartitionStatistics(

Review Comment:
   the implementation is totally same the the default one, I think we can 
remove it



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -183,36 +181,29 @@ private TableStats getPartitionsTableStats(
             } else {
                 partitionList = partitionPushDownSpec.getPartitions();
             }
-            for (Map<String, String> partition : partitionList) {
-                Optional<TableStats> partitionStats =
-                        getPartitionStats(catalog, tablePath, partition);
-                if (!partitionStats.isPresent()) {
-                    // clear all information before
-                    newTableStat = null;
-                    break;
-                } else {
-                    newTableStat =
-                            newTableStat == null
-                                    ? partitionStats.get()
-                                    : newTableStat.merge(partitionStats.get());
-                }
-            }
+            return getPartitionStats(
+                            table.contextResolvedTable().getCatalog().get(),
+                            
table.contextResolvedTable().getIdentifier().toObjectPath(),
+                            partitionList)
+                    .get();
         }
 
-        return newTableStat;
+        return TableStats.UNKNOWN;
     }
 
     private Optional<TableStats> getPartitionStats(
-            Catalog catalog, ObjectPath tablePath, Map<String, String> 
partition) {
+            Catalog catalog, ObjectPath tablePath, List<Map<String, String>> 
partition) {
         try {
-            CatalogPartitionSpec spec = new CatalogPartitionSpec(partition);
-            CatalogTableStatistics partitionStat = 
catalog.getPartitionStatistics(tablePath, spec);
-            CatalogColumnStatistics partitionColStat =
-                    catalog.getPartitionColumnStatistics(tablePath, spec);
-            TableStats stats =
-                    CatalogTableStatisticsConverter.convertToTableStats(
-                            partitionStat, partitionColStat);
-            return Optional.of(stats);
+
+            final List<CatalogPartitionSpec> partitionSpecs =
+                    partition.stream()
+                            .map(p -> new CatalogPartitionSpec(p))

Review Comment:
   nit: .map(CatalogPartitionSpec::new)



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/CatalogTableStatisticsConverter.java:
##########
@@ -61,6 +63,27 @@ public static TableStats convertToTableStats(
         return new TableStats(rowCount, columnStatsMap);
     }
 
+    public static TableStats convertToAccumulatedTableStates(
+            List<CatalogTableStatistics> tableStatisticsList,
+            List<CatalogColumnStatistics> catalogColumnStatisticsList) {
+        final Optional<TableStats> rowCountMergedTableStats =
+                tableStatisticsList.stream()
+                        .map(p -> 
CatalogTableStatisticsConverter.convertToTableStats(p, null))
+                        .reduce((s1, s2) -> s1.merge(s2));
+
+        final Optional<TableStats> columnStatsMergedTableStats =
+                catalogColumnStatisticsList.stream()
+                        .map(
+                                p ->
+                                        
CatalogTableStatisticsConverter.convertToTableStats(
+                                                CatalogTableStatistics.EMPTY, 
p))
+                        .reduce((s1, s2) -> s1.merge(s2));

Review Comment:
   We can just convert a CatalogTableStatistics and its corresponding  
CatalogColumnStatistics into a TableStats, and then call TableStats#merge. If 
that, CatalogTableStatistics.EMPTY, StatisticDataUtils can be removed. There's 
no calculation here, and even CatalogColumnStatisticDataBuilder is no needed. 
This pr will become very simple



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java:
##########
@@ -240,6 +246,84 @@ void testAlterPartitionColumnStatistics() throws Exception 
{
                 catalog.getPartitionColumnStatistics(path1, partitionSpec));
     }
 
+    private ResolvedSchema createSchemaForStatistics() {
+        return new ResolvedSchema(
+                Arrays.asList(
+                        Column.physical("first", DataTypes.STRING()),
+                        Column.physical("four", DataTypes.BOOLEAN()),
+                        Column.physical("five", DataTypes.DOUBLE()),
+                        Column.physical("second", DataTypes.DATE()),
+                        Column.physical("third", DataTypes.INT())),
+                Collections.emptyList(),
+                null);
+    }
+
+    @Test
+    public void testGetTableAndColumnStatistics() throws Exception {
+
+        // create a test table
+        catalog.createDatabase(db1, createDb(), false);
+        final ResolvedSchema resolvedSchema = createSchemaForStatistics();
+        final CatalogTable origin =
+                CatalogTable.of(
+                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        TEST_COMMENT,
+                        createPartitionKeys(),
+                        getBatchTableProperties());
+        CatalogTable catalogTable = new ResolvedCatalogTable(origin, 
resolvedSchema);
+
+        catalog.createTable(path1, catalogTable, false);
+
+        int second = 29;
+        final int partitionFileSize = 20;
+        final String partitionName = "third";
+
+        List<Map<String, String>> partitionSpecs = new ArrayList<>();
+        Map<String, String> partitionSpec = null;
+
+        while (second-- > 10) {
+            int third = 20;
+            while (third-- > 0) {
+                partitionSpec = new HashMap<String, String>();
+                partitionSpec.put("second", "2010-04-" + second + " 00:00:00");
+                partitionSpec.put(partitionName, Integer.toString(third));
+                createPartition(partitionSpec);
+                partitionSpecs.add(partitionSpec);
+            }
+        }
+
+        final List<CatalogPartitionSpec> catalogPartitionSpecList =
+                partitionSpecs.stream()
+                        .map(p -> new CatalogPartitionSpec(p))
+                        .collect(Collectors.toList());
+
+        final List<CatalogTableStatistics> partitionStatistics =
+                catalog.bulkGetPartitionStatistics(path1, 
catalogPartitionSpecList);
+        
assertThat(partitionStatistics.size()).isEqualTo(partitionSpecs.size());
+
+        final List<CatalogColumnStatistics> tableColumnStatistics =
+                catalog.bulkGetPartitionColumnStatistics(path1, 
catalogPartitionSpecList);
+
+        
assertThat(tableColumnStatistics.size()).isEqualTo(catalogPartitionSpecList.size()
 + 1);
+
+        final CatalogColumnStatistics catalogColumnStatistics =
+                tableColumnStatistics.get(tableColumnStatistics.size() - 1);
+        CatalogColumnStatisticsDataLong columnStatisticsDataLong =
+                (CatalogColumnStatisticsDataLong)
+                        
catalogColumnStatistics.getColumnStatisticsData().get(partitionName);
+
+        
assertThat(columnStatisticsDataLong.getNdv()).isEqualTo(catalogPartitionSpecList.size());
+        assertThat(columnStatisticsDataLong.getMin()).isEqualTo(0);
+        
assertThat(columnStatisticsDataLong.getMax()).isEqualTo(partitionFileSize - 1);

Review Comment:
   other columns' statistics does not been verified



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -438,6 +440,38 @@ public CatalogTableStatistics getPartitionStatistics(
         return CatalogTableStatistics.UNKNOWN;
     }
 
+    @Override
+    public List<CatalogTableStatistics> bulkGetPartitionStatistics(

Review Comment:
   I think we can just use the default implementation, and remove this method.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1699,6 +1716,69 @@ public CatalogTableStatistics getPartitionStatistics(
         }
     }
 
+    @Override
+    public List<CatalogTableStatistics> bulkGetPartitionStatistics(
+            ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+            throws PartitionNotExistException, CatalogException {
+
+        return getTableAndPartitions(tablePath, partitionSpecs).f1.stream()
+                .map(p -> createCatalogTableStatistics(p.getParameters()))
+                .collect(Collectors.toList());
+    }
+
+    /** @return Partitions and names of partitions. */
+    private Tuple3<Table, List<Partition>, List<String>> getTableAndPartitions(

Review Comment:
   We can just split this method into two methods, one is to get the HiveTable, 
and another is to get partitions. That could make the result type more simple 
and readable



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java:
##########
@@ -316,13 +316,27 @@ public CatalogTableStatistics getPartitionStatistics(
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public List<CatalogTableStatistics> bulkGetPartitionStatistics(
+                ObjectPath tablePath, List<CatalogPartitionSpec> 
partitionSpecs)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedClassVersionError();

Review Comment:
   should throw `UnsupportedOperationException`



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(
         }
     }
 
+    @Override
+    public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+            ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+            throws PartitionNotExistException, CatalogException {
+
+        checkNotNull(partitionSpecs);
+
+        List<CatalogColumnStatistics> result = new 
ArrayList<>(partitionSpecs.size());
+
+        final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+                getTableAndPartitions(tablePath, partitionSpecs);
+
+        try {
+
+            final List<FieldSchema> distinctColumns =
+                    partitionsTuple3.f1.stream()
+                            .map(p -> p.getSd().getCols())
+                            .flatMap(Collection::stream)
+                            .distinct()
+                            .collect(Collectors.toList());
+
+            Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics =
+                    client.getPartitionColumnStatistics(
+                            partitionsTuple3.f1.get(0).getDbName(),
+                            partitionsTuple3.f1.get(0).getTableName(),
+                            partitionsTuple3.f2,
+                            getFieldNames(distinctColumns));
+
+            for (String partitionName : partitionsTuple3.f2) {
+                List<ColumnStatisticsObj> columnStatisticsObjs =
+                        partitionColumnStatistics.get(partitionName);
+                if (columnStatisticsObjs != null && 
!columnStatisticsObjs.isEmpty()) {
+                    result.add(
+                            new CatalogColumnStatistics(
+                                    HiveStatsUtil.createCatalogColumnStats(
+                                            columnStatisticsObjs, 
hiveVersion)));
+                } else {
+                    result.add(CatalogColumnStatistics.UNKNOWN);
+                }
+            }
+
+            result.add(
+                    buildPartitionedByColumnSatistics(
+                            tablePath,
+                            partitionsTuple3
+                                    .f0
+                                    .getPartitionKeys()
+                                    
.get(partitionsTuple3.f0.getPartitionKeys().size() - 1),
+                            partitionsTuple3.f2));
+        } catch (TException e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to get table stats of table %s 's 
partitions %s",
+                            tablePath.getFullName(), 
String.valueOf(partitionSpecs)),
+                    e);
+        }
+
+        return result;
+    }
+
+    private CatalogColumnStatistics buildPartitionedByColumnSatistics(
+            ObjectPath tablePath, FieldSchema partitionedByColumn, 
List<String> partitionNames) {
+
+        Long nullCount = null;
+        Long ndv = HiveStatsUtil.max(0L, Long.valueOf(partitionNames.size()));
+
+        CatalogColumnStatisticDataBuilder<? extends 
CatalogColumnStatisticsDataBase> builder = null;
+
+        final CatalogColumnStatisticsDataLong.Builder longBuilder =
+                CatalogColumnStatisticsDataLong.builder();
+        longBuilder.ndv(ndv);
+        final CatalogColumnStatisticsDataDate.Builder dateBuilder =
+                CatalogColumnStatisticsDataDate.builder();
+        dateBuilder.ndv(ndv);
+        final CatalogColumnStatisticsDataDouble.Builder doubleBuilder =
+                CatalogColumnStatisticsDataDouble.builder();
+        doubleBuilder.ndv(ndv);
+        final CatalogColumnStatisticsDataString.Builder stringBuilder =
+                CatalogColumnStatisticsDataString.builder();
+        stringBuilder.ndv(ndv);
+        final CatalogColumnStatisticsDataBoolean.Builder booleanBuilder =
+                CatalogColumnStatisticsDataBoolean.builder();
+        final CatalogColumnStatisticsDataBinary.Builder binaryBuilder =
+                CatalogColumnStatisticsDataBinary.builder();
+
+        for (String partitionName : partitionNames) {
+            String[] partName = partitionName.split(Path.SEPARATOR);
+            String[] partVal = partName[partName.length - 1].split("=");
+            String value = partVal[1];
+            if (value.equals(HIVE_DEFAULT_PARTITION)) {
+                try {
+                    nullCount =

Review Comment:
   Not set nullCount



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java:
##########
@@ -240,6 +246,84 @@ void testAlterPartitionColumnStatistics() throws Exception 
{
                 catalog.getPartitionColumnStatistics(path1, partitionSpec));
     }
 
+    private ResolvedSchema createSchemaForStatistics() {
+        return new ResolvedSchema(
+                Arrays.asList(
+                        Column.physical("first", DataTypes.STRING()),
+                        Column.physical("four", DataTypes.BOOLEAN()),
+                        Column.physical("five", DataTypes.DOUBLE()),
+                        Column.physical("second", DataTypes.DATE()),
+                        Column.physical("third", DataTypes.INT())),
+                Collections.emptyList(),
+                null);
+    }
+
+    @Test
+    public void testGetTableAndColumnStatistics() throws Exception {
+
+        // create a test table
+        catalog.createDatabase(db1, createDb(), false);
+        final ResolvedSchema resolvedSchema = createSchemaForStatistics();
+        final CatalogTable origin =
+                CatalogTable.of(
+                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        TEST_COMMENT,
+                        createPartitionKeys(),
+                        getBatchTableProperties());
+        CatalogTable catalogTable = new ResolvedCatalogTable(origin, 
resolvedSchema);
+
+        catalog.createTable(path1, catalogTable, false);
+
+        int second = 29;
+        final int partitionFileSize = 20;
+        final String partitionName = "third";
+
+        List<Map<String, String>> partitionSpecs = new ArrayList<>();
+        Map<String, String> partitionSpec = null;
+
+        while (second-- > 10) {
+            int third = 20;
+            while (third-- > 0) {
+                partitionSpec = new HashMap<String, String>();
+                partitionSpec.put("second", "2010-04-" + second + " 00:00:00");
+                partitionSpec.put(partitionName, Integer.toString(third));
+                createPartition(partitionSpec);
+                partitionSpecs.add(partitionSpec);
+            }
+        }
+
+        final List<CatalogPartitionSpec> catalogPartitionSpecList =
+                partitionSpecs.stream()
+                        .map(p -> new CatalogPartitionSpec(p))
+                        .collect(Collectors.toList());
+
+        final List<CatalogTableStatistics> partitionStatistics =
+                catalog.bulkGetPartitionStatistics(path1, 
catalogPartitionSpecList);
+        
assertThat(partitionStatistics.size()).isEqualTo(partitionSpecs.size());
+
+        final List<CatalogColumnStatistics> tableColumnStatistics =
+                catalog.bulkGetPartitionColumnStatistics(path1, 
catalogPartitionSpecList);
+
+        
assertThat(tableColumnStatistics.size()).isEqualTo(catalogPartitionSpecList.size()
 + 1);
+
+        final CatalogColumnStatistics catalogColumnStatistics =
+                tableColumnStatistics.get(tableColumnStatistics.size() - 1);
+        CatalogColumnStatisticsDataLong columnStatisticsDataLong =
+                (CatalogColumnStatisticsDataLong)
+                        
catalogColumnStatistics.getColumnStatisticsData().get(partitionName);
+
+        
assertThat(columnStatisticsDataLong.getNdv()).isEqualTo(catalogPartitionSpecList.size());
+        assertThat(columnStatisticsDataLong.getMin()).isEqualTo(0);
+        
assertThat(columnStatisticsDataLong.getMax()).isEqualTo(partitionFileSize - 1);
+
+        final List<CatalogPartitionSpec> catalogPartitionSpecs = 
catalog.listPartitions(path1);

Review Comment:
   This line can be removed ?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -566,6 +565,31 @@ CatalogTableStatistics getPartitionStatistics(
             ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
             throws PartitionNotExistException, CatalogException;
 
+    /**
+     * Get a list of statistics of given partitions.
+     *
+     * @param tablePath path of the table
+     * @param partitionSpecs partition specs of partitions that will be used 
to filter out all other
+     *     unrelated statistics, i.e. the statistics fetch will be limited 
within the given
+     *     partitions
+     * @return list of statistics of given partitions
+     * @throws PartitionNotExistException if one partition does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default List<CatalogTableStatistics> bulkGetPartitionStatistics(

Review Comment:
   This method should be tested via GenericInMemoryCatalog



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/StatisticDataUtils.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+/** Utils class for catalog column statistics. */
+public class StatisticDataUtils {
+
+    public static Double acculmulateAverage(
+            Double oldAvgLength, Long oldCount, Long newLength, Long newCount) 
{
+        if (oldAvgLength == null || oldCount == null) {
+            return Double.valueOf(newLength);
+        }
+        if (newLength == null || newCount == null) {
+            return oldAvgLength;
+        }
+        return oldCount + newCount == 0
+                ? null
+                : (oldAvgLength * oldCount + newLength * newCount) / (oldCount 
+ newCount);
+    }
+
+    public static Long min(Long oldLength, Long newLength) {

Review Comment:
   The parameter name should be more generic



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -311,11 +314,7 @@ void createTable(ObjectPath tablePath, CatalogBaseTable 
table, boolean ignoreIfE
     void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, CatalogException;
 
-    /**
-     * If true, tables which do not specify a connector will be translated to 
managed tables.
-     *
-     * @see CatalogBaseTable.TableKind#MANAGED
-     */

Review Comment:
   Unnecessary change ?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java:
##########
@@ -211,6 +211,11 @@ public ColumnStats copy() {
      * @return The merged column stats.
      */
     public ColumnStats merge(ColumnStats other) {
+
+        if (other == null) {

Review Comment:
   if other is null or unknown, the ColumnStats should return UNKNOWN



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to