This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fe392645421 [FLINK-29059][table-planner] Fix the existing column stats
are deleted incorrectly when analyze table for partial columns
fe392645421 is described below
commit fe392645421d10923c75cd5438b91d9ed55900d3
Author: zhengyunhong.zyh <[email protected]>
AuthorDate: Wed Aug 24 11:51:29 2022 +0800
[FLINK-29059][table-planner] Fix the existing column stats are deleted
incorrectly when analyze table for partial columns
This closes #20672
---
.../flink/table/api/internal/AnalyzeTableUtil.java | 32 ++++++--
.../runtime/batch/sql/AnalyzeTableITCase.java | 92 ++++++++++++++++++++++
2 files changed, 118 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
index 37471ed52ae..45d324d4e48 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
@@ -85,10 +85,15 @@ public class AnalyzeTableUtil {
executeSqlAndGenerateStatistics(tableEnv, columns,
statSql);
CatalogTableStatistics tableStat = result.f0;
catalog.alterPartitionStatistics(objectPath, partitionSpec,
tableStat, false);
- CatalogColumnStatistics columnStat = result.f1;
- if (columnStat != null) {
+ CatalogColumnStatistics newColumnStat = result.f1;
+ if (newColumnStat != null) {
+ CatalogColumnStatistics oldColumnStat =
+ catalog.getPartitionColumnStatistics(objectPath,
partitionSpec);
+ // merge stats
+ CatalogColumnStatistics mergedColumnStatistics =
+ mergeColumnStatistics(oldColumnStat,
newColumnStat);
catalog.alterPartitionColumnStatistics(
- objectPath, partitionSpec, columnStat, false);
+ objectPath, partitionSpec, mergedColumnStatistics,
false);
}
}
} else {
@@ -97,14 +102,29 @@ public class AnalyzeTableUtil {
executeSqlAndGenerateStatistics(tableEnv, columns,
statSql);
CatalogTableStatistics tableStat = result.f0;
catalog.alterTableStatistics(objectPath, tableStat, false);
- CatalogColumnStatistics columnStat = result.f1;
- if (columnStat != null) {
- catalog.alterTableColumnStatistics(objectPath, columnStat,
false);
+ CatalogColumnStatistics newColumnStat = result.f1;
+ if (newColumnStat != null) {
+ CatalogColumnStatistics oldColumnStat =
+ catalog.getTableColumnStatistics(objectPath);
+ // merge stats.
+ CatalogColumnStatistics mergedColumnStatistics =
+ mergeColumnStatistics(oldColumnStat, newColumnStat);
+ catalog.alterTableColumnStatistics(objectPath,
mergedColumnStatistics, false);
}
}
return TableResultImpl.TABLE_RESULT_OK;
}
+ private static CatalogColumnStatistics mergeColumnStatistics(
+ CatalogColumnStatistics oldColumnStatistics,
+ CatalogColumnStatistics newColumnStatistics) {
+ CatalogColumnStatistics columnStatistics = oldColumnStatistics.copy();
+ columnStatistics
+ .getColumnStatisticsData()
+ .putAll(newColumnStatistics.getColumnStatisticsData());
+ return columnStatistics;
+ }
+
private static Tuple2<CatalogTableStatistics, CatalogColumnStatistics>
executeSqlAndGenerateStatistics(
TableEnvironmentImpl tableEnv, List<Column> columns,
String statSql) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
index 2ba4e0b4bf0..d61bb8440bd 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
@@ -353,6 +354,36 @@ public class AnalyzeTableITCase extends BatchTestBase {
.isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
}
+ @Test
+ public void
testNonPartitionTableAnalyzePartialColumnsWithSomeColumnsHaveColumnStats()
+ throws TableNotExistException {
+ // If some columns have table column stats, analyze table for partial
columns will merge
+ // these exist columns stats instead of covering it.
+ // Adding column stats to partial columns.
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics
for columns f, a, d");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(),
"NonPartitionTable");
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData =
new HashMap<>();
+ columnStatisticsData.put("a", new
CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+ columnStatisticsData.put("f", new
CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+ columnStatisticsData.put(
+ "d",
+ new CatalogColumnStatisticsDataLong(
+ (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE,
4L, 1L));
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData));
+
+ // Analyze different column sets.
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics
for columns d, e");
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ columnStatisticsData.put(
+ "e", new CatalogColumnStatisticsDataLong(Long.MIN_VALUE,
Long.MAX_VALUE, 4L, 1L));
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData));
+ }
+
@Test
public void testPartitionTableWithoutPartition() {
assertThatThrownBy(() -> tEnv.executeSql("analyze table PartitionTable
compute statistics"))
@@ -537,6 +568,67 @@ public class AnalyzeTableITCase extends BatchTestBase {
assertPartitionStatistics(path, "e=3,a=5", -1L);
}
+ @Test
+ public void
testPartitionTableAnalyzePartialColumnsWithSomeColumnsHaveColumnStats()
+ throws Exception {
+ // If some columns have table column stats, analyze table for partial
columns will merge
+ // these exist columns stats instead of covering it.
+ // Adding column stats to partial columns.
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute
statistics for columns a, b, c");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(),
"PartitionTable");
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData =
new HashMap<>();
+ columnStatisticsData.put("a", new CatalogColumnStatisticsDataLong(5L,
5L, 1L, 0L));
+ columnStatisticsData.put("b", new CatalogColumnStatisticsDataLong(14L,
15L, 2L, 0L));
+ columnStatisticsData.put("c", new CatalogColumnStatisticsDataLong(13L,
14L, 2L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new
CatalogColumnStatistics(columnStatisticsData));
+
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute
statistics for columns c, d");
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ columnStatisticsData.put("d", new
CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new
CatalogColumnStatistics(columnStatisticsData));
+ }
+
+ @Test
+ public void
testPartitionTableAnalyzePartialPartitionWithSomePartitionHaveColumnStats()
+ throws Exception {
+ // For different partitions, their column stats are isolated and
should not affect each
+ // other.
+ // Adding column stats to one partition.
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute
statistics for columns a, b, c");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(),
"PartitionTable");
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 =
new HashMap<>();
+ columnStatisticsData1.put("a", new CatalogColumnStatisticsDataLong(5L,
5L, 1L, 0L));
+ columnStatisticsData1.put("b", new
CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+ columnStatisticsData1.put("c", new
CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new
CatalogColumnStatistics(columnStatisticsData1));
+
+ // Adding column stats to another partition.
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=4) compute
statistics for columns a, d");
+
assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ // origin analyze partition.
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new
CatalogColumnStatistics(columnStatisticsData1));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 =
new HashMap<>();
+ columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(4L,
4L, 1L, 0L));
+ columnStatisticsData2.put("d", new
CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+ // new analyze partition.
+ assertPartitionStatistics(
+ path, "e=2,a=4", 2L, new
CatalogColumnStatistics(columnStatisticsData2));
+ }
+
private void assertPartitionStatistics(ObjectPath path, String
partitionSpec, long rowCount)
throws Exception {
CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);