godfreyhe commented on code in PR #20501:
URL: https://github.com/apache/flink/pull/20501#discussion_r940832517
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java:
##########
@@ -90,6 +102,257 @@ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogColumnSt
return colStats;
}
+ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogPartitionColumnStats(
+ HiveMetastoreClientWrapper client,
+ HiveShim hiveShim,
+ Table hiveTable,
+ String partitionName,
+ List<FieldSchema> partitionColsSchema,
+ String defaultPartitionName) {
+ Map<String, CatalogColumnStatisticsDataBase> partitionColumnStats =
new HashMap<>();
+ List<String> partitionCols = new
ArrayList<>(partitionColsSchema.size());
+ List<LogicalType> partitionColsType = new
ArrayList<>(partitionColsSchema.size());
+ for (FieldSchema fieldSchema : partitionColsSchema) {
+ partitionCols.add(fieldSchema.getName());
+ partitionColsType.add(
+ HiveTypeUtil.toFlinkType(
+
TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType()))
+ .getLogicalType());
+ }
+
+ // the partition column and values for the partition column
+ Map<String, Object> partitionColValues = new HashMap<>();
+ CatalogPartitionSpec partitionSpec =
+ HivePartitionUtils.createPartitionSpec(partitionName,
defaultPartitionName);
+ for (int i = 0; i < partitionCols.size(); i++) {
+ String partitionCol = partitionCols.get(i);
+ String partitionStrVal =
partitionSpec.getPartitionSpec().get(partitionCols.get(i));
+ if (partitionStrVal == null) {
+ partitionColValues.put(partitionCol, null);
+ } else {
+ partitionColValues.put(
+ partitionCol,
+ HivePartitionUtils.restorePartitionValueFromType(
+ hiveShim,
+ partitionStrVal,
+ partitionColsType.get(i),
+ defaultPartitionName));
+ }
+ }
+
+ // calculate statistic for each partition column
+ for (int i = 0; i < partitionCols.size(); i++) {
+ Object partitionValue =
partitionColValues.get(partitionCols.get(i));
+ LogicalType logicalType = partitionColsType.get(i);
+ CatalogColumnStatisticsDataBase catalogColumnStatistics =
+ getColumnStatistics(
+ client,
+ hiveTable,
+ logicalType,
+ partitionValue,
+ i,
+ defaultPartitionName);
+ if (catalogColumnStatistics != null) {
+ partitionColumnStats.put(partitionCols.get(i),
catalogColumnStatistics);
+ }
+ }
+
+ return partitionColumnStats;
+ }
+
+ /**
+ * Get statistics for specific partition column.
+ *
+ * @param logicalType the specific partition column's logical type
+ * @param partitionValue the partition value for the specific partition
column
+ * @param partitionColIndex the index of the specific partition
+ * @param defaultPartitionName the default partition name for null value
+ */
+ private static CatalogColumnStatisticsDataBase getColumnStatistics(
+ HiveMetastoreClientWrapper client,
+ Table hiveTable,
+ LogicalType logicalType,
+ Object partitionValue,
+ int partitionColIndex,
+ String defaultPartitionName) {
+ switch (logicalType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ {
+ Long maxLength = null;
+ Double avgLength = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ long valLength = ((String) partitionValue).length();
+ maxLength = valLength;
+ avgLength = (double) valLength;
+ }
+ return new CatalogColumnStatisticsDataString(
+ maxLength, avgLength, 1L, nullCount);
+ }
+ case BOOLEAN:
+ {
+ long trueCount = 0L;
+ long falseCount = 0L;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ Boolean boolVal = (Boolean) partitionValue;
+ if (boolVal) {
+ trueCount = 1L;
+ } else {
+ falseCount = 1L;
+ }
+ }
+ return new CatalogColumnStatisticsDataBoolean(trueCount,
falseCount, nullCount);
+ }
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ {
+ Long min = null;
+ Long max = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ min = ((Number) partitionValue).longValue();
+ max = min;
+ }
+ return new CatalogColumnStatisticsDataLong(min, max, 1L,
nullCount);
+ }
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ {
+ Double min = null;
+ Double max = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ min = ((Number) partitionValue).doubleValue();
+ max = min;
+ }
+ return new CatalogColumnStatisticsDataDouble(min, max, 1L,
nullCount);
+ }
+ case DATE:
+ {
+ Date min = null;
+ Date max = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ min = new Date(((LocalDate)
partitionValue).toEpochDay());
+ max = min;
+ }
+ return new CatalogColumnStatisticsDataDate(min, max, 1L,
nullCount);
+ }
+ default:
+ return null;
Review Comment:
TIMESTAMP is unsupported ?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java:
##########
@@ -90,6 +102,257 @@ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogColumnSt
return colStats;
}
+ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogPartitionColumnStats(
+ HiveMetastoreClientWrapper client,
+ HiveShim hiveShim,
+ Table hiveTable,
+ String partitionName,
+ List<FieldSchema> partitionColsSchema,
+ String defaultPartitionName) {
+ Map<String, CatalogColumnStatisticsDataBase> partitionColumnStats =
new HashMap<>();
+ List<String> partitionCols = new
ArrayList<>(partitionColsSchema.size());
+ List<LogicalType> partitionColsType = new
ArrayList<>(partitionColsSchema.size());
+ for (FieldSchema fieldSchema : partitionColsSchema) {
+ partitionCols.add(fieldSchema.getName());
+ partitionColsType.add(
+ HiveTypeUtil.toFlinkType(
+
TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType()))
+ .getLogicalType());
+ }
+
+ // the partition column and values for the partition column
+ Map<String, Object> partitionColValues = new HashMap<>();
+ CatalogPartitionSpec partitionSpec =
+ HivePartitionUtils.createPartitionSpec(partitionName,
defaultPartitionName);
+ for (int i = 0; i < partitionCols.size(); i++) {
+ String partitionCol = partitionCols.get(i);
+ String partitionStrVal =
partitionSpec.getPartitionSpec().get(partitionCols.get(i));
+ if (partitionStrVal == null) {
+ partitionColValues.put(partitionCol, null);
+ } else {
+ partitionColValues.put(
+ partitionCol,
+ HivePartitionUtils.restorePartitionValueFromType(
+ hiveShim,
+ partitionStrVal,
+ partitionColsType.get(i),
+ defaultPartitionName));
+ }
+ }
+
+ // calculate statistic for each partition column
+ for (int i = 0; i < partitionCols.size(); i++) {
+ Object partitionValue =
partitionColValues.get(partitionCols.get(i));
+ LogicalType logicalType = partitionColsType.get(i);
+ CatalogColumnStatisticsDataBase catalogColumnStatistics =
+ getColumnStatistics(
+ client,
+ hiveTable,
+ logicalType,
+ partitionValue,
+ i,
+ defaultPartitionName);
+ if (catalogColumnStatistics != null) {
+ partitionColumnStats.put(partitionCols.get(i),
catalogColumnStatistics);
+ }
+ }
+
+ return partitionColumnStats;
+ }
+
+ /**
+ * Get statistics for specific partition column.
+ *
+ * @param logicalType the specific partition column's logical type
+ * @param partitionValue the partition value for the specific partition
column
+ * @param partitionColIndex the index of the specific partition
+ * @param defaultPartitionName the default partition name for null value
+ */
+ private static CatalogColumnStatisticsDataBase getColumnStatistics(
Review Comment:
how about `getPartitionColumnStats` ?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java:
##########
@@ -90,6 +102,257 @@ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogColumnSt
return colStats;
}
+ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogPartitionColumnStats(
Review Comment:
getCatalogPartitionColumnStats ?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java:
##########
@@ -27,8 +27,11 @@
/** Statistics for a non-partitioned table or a partition of a partitioned
table. */
@PublicEvolving
public class CatalogTableStatistics {
+
public static final CatalogTableStatistics UNKNOWN = new
CatalogTableStatistics(-1, -1, -1, -1);
+ public static final CatalogTableStatistics EMPTY = new
CatalogTableStatistics(0, 0, 0, 0);
Review Comment:
useless line, remove
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java:
##########
@@ -90,6 +102,257 @@ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogColumnSt
return colStats;
}
+ public static Map<String, CatalogColumnStatisticsDataBase>
createCatalogPartitionColumnStats(
+ HiveMetastoreClientWrapper client,
+ HiveShim hiveShim,
+ Table hiveTable,
+ String partitionName,
+ List<FieldSchema> partitionColsSchema,
+ String defaultPartitionName) {
+ Map<String, CatalogColumnStatisticsDataBase> partitionColumnStats =
new HashMap<>();
+ List<String> partitionCols = new
ArrayList<>(partitionColsSchema.size());
+ List<LogicalType> partitionColsType = new
ArrayList<>(partitionColsSchema.size());
+ for (FieldSchema fieldSchema : partitionColsSchema) {
+ partitionCols.add(fieldSchema.getName());
+ partitionColsType.add(
+ HiveTypeUtil.toFlinkType(
+
TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType()))
+ .getLogicalType());
+ }
+
+ // the partition column and values for the partition column
+ Map<String, Object> partitionColValues = new HashMap<>();
+ CatalogPartitionSpec partitionSpec =
+ HivePartitionUtils.createPartitionSpec(partitionName,
defaultPartitionName);
+ for (int i = 0; i < partitionCols.size(); i++) {
+ String partitionCol = partitionCols.get(i);
+ String partitionStrVal =
partitionSpec.getPartitionSpec().get(partitionCols.get(i));
+ if (partitionStrVal == null) {
+ partitionColValues.put(partitionCol, null);
+ } else {
+ partitionColValues.put(
+ partitionCol,
+ HivePartitionUtils.restorePartitionValueFromType(
+ hiveShim,
+ partitionStrVal,
+ partitionColsType.get(i),
+ defaultPartitionName));
+ }
+ }
+
+ // calculate statistic for each partition column
+ for (int i = 0; i < partitionCols.size(); i++) {
+ Object partitionValue =
partitionColValues.get(partitionCols.get(i));
+ LogicalType logicalType = partitionColsType.get(i);
+ CatalogColumnStatisticsDataBase catalogColumnStatistics =
+ getColumnStatistics(
+ client,
+ hiveTable,
+ logicalType,
+ partitionValue,
+ i,
+ defaultPartitionName);
+ if (catalogColumnStatistics != null) {
+ partitionColumnStats.put(partitionCols.get(i),
catalogColumnStatistics);
+ }
+ }
+
+ return partitionColumnStats;
+ }
+
+ /**
+ * Get statistics for specific partition column.
+ *
+ * @param logicalType the specific partition column's logical type
+ * @param partitionValue the partition value for the specific partition
column
+ * @param partitionColIndex the index of the specific partition
+ * @param defaultPartitionName the default partition name for null value
+ */
+ private static CatalogColumnStatisticsDataBase getColumnStatistics(
+ HiveMetastoreClientWrapper client,
+ Table hiveTable,
+ LogicalType logicalType,
+ Object partitionValue,
+ int partitionColIndex,
+ String defaultPartitionName) {
+ switch (logicalType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ {
+ Long maxLength = null;
+ Double avgLength = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ long valLength = ((String) partitionValue).length();
+ maxLength = valLength;
+ avgLength = (double) valLength;
+ }
+ return new CatalogColumnStatisticsDataString(
+ maxLength, avgLength, 1L, nullCount);
+ }
+ case BOOLEAN:
+ {
+ long trueCount = 0L;
+ long falseCount = 0L;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ Boolean boolVal = (Boolean) partitionValue;
+ if (boolVal) {
+ trueCount = 1L;
+ } else {
+ falseCount = 1L;
+ }
+ }
+ return new CatalogColumnStatisticsDataBoolean(trueCount,
falseCount, nullCount);
+ }
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ {
+ Long min = null;
+ Long max = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ min = ((Number) partitionValue).longValue();
+ max = min;
+ }
+ return new CatalogColumnStatisticsDataLong(min, max, 1L,
nullCount);
+ }
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ {
+ Double min = null;
+ Double max = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ min = ((Number) partitionValue).doubleValue();
+ max = min;
+ }
+ return new CatalogColumnStatisticsDataDouble(min, max, 1L,
nullCount);
+ }
+ case DATE:
+ {
+ Date min = null;
+ Date max = null;
+ Long nullCount = 0L;
+ if (partitionValue == null) {
+ nullCount =
+ getNullCount(
+ client, hiveTable, partitionColIndex,
defaultPartitionName);
+ } else {
+ min = new Date(((LocalDate)
partitionValue).toEpochDay());
+ max = min;
+ }
+ return new CatalogColumnStatisticsDataDate(min, max, 1L,
nullCount);
+ }
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Get the null count for the {@param partitionColIndex} partition column
in table {@param
+ * hiveTable}.
+ *
+ * <p>To get the null count, it will first list all the partitions whose
{@param
+ * partitionColIndex} partition column is null, and merge the partition's
statistic to get the
+ * total rows, which is exactly null count for the {@param
partitionColIndex} partition column.
+ */
+ private static Long getNullCount(
Review Comment:
getPartitionColumnNullCount
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]