This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 88e416f [FLINK-18046][hive] Decimal column stats not supported for Hive table 88e416f is described below commit 88e416f988bc286b7cc0df7bcf0679184e7bb805 Author: Rui Li <li...@apache.org> AuthorDate: Mon Jun 8 18:41:11 2020 +0800 [FLINK-18046][hive] Decimal column stats not supported for Hive table This closes #12424 --- .../table/catalog/hive/util/HiveStatsUtil.java | 51 ++++++++++++++++++++++ .../catalog/hive/HiveCatalogHiveMetadataTest.java | 10 +++-- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java index 9fd4394..d4ae6d2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java @@ -31,12 +31,15 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Decimal; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; @@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -171,6 +177,20 @@ public class HiveStatsUtil { stringStats.isSetAvgColLen() ? stringStats.getAvgColLen() : null, stringStats.isSetNumDVs() ? stringStats.getNumDVs() : null, stringStats.isSetNumDVs() ? stringStats.getNumNulls() : null); + } else if (stats.isSetDecimalStats()) { + DecimalColumnStatsData decimalStats = stats.getDecimalStats(); + // for now, just return CatalogColumnStatisticsDataDouble for decimal columns + Double max = null; + if (decimalStats.isSetHighValue()) { + max = toHiveDecimal(decimalStats.getHighValue()).doubleValue(); + } + Double min = null; + if (decimalStats.isSetLowValue()) { + min = toHiveDecimal(decimalStats.getLowValue()).doubleValue(); + } + Long ndv = decimalStats.isSetNumDVs() ? decimalStats.getNumDVs() : null; + Long nullCount = decimalStats.isSetNumNulls() ? decimalStats.getNumNulls() : null; + return new CatalogColumnStatisticsDataDouble(min, max, ndv, nullCount); } else { LOG.warn("Flink does not support converting ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType); return null; @@ -288,11 +308,42 @@ public class HiveStatsUtil { } return ColumnStatisticsData.binaryStats(hiveBinaryColumnStats); } + } else if (type.equals(LogicalTypeRoot.DECIMAL)) { + if (colStat instanceof CatalogColumnStatisticsDataDouble) { + CatalogColumnStatisticsDataDouble flinkStats = (CatalogColumnStatisticsDataDouble) colStat; + DecimalColumnStatsData hiveStats = new DecimalColumnStatsData(); + if (flinkStats.getMax() != null) { + // in older versions we cannot create HiveDecimal from Double, so convert Double to BigDecimal first + hiveStats.setHighValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMax())))); + } + if (flinkStats.getMin() != null) { + hiveStats.setLowValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMin())))); + } + if (flinkStats.getNdv() != null) { + hiveStats.setNumDVs(flinkStats.getNdv()); + } + if (flinkStats.getNullCount() != null) { + hiveStats.setNumNulls(flinkStats.getNullCount()); + } + return ColumnStatisticsData.decimalStats(hiveStats); + } } throw new CatalogException(String.format("Flink does not support converting ColumnStats '%s' for Hive column " + "type '%s' yet", colStat, colType)); } + private static Decimal toThriftDecimal(HiveDecimal hiveDecimal) { + // the constructor signature changed in 3.x. use default constructor and set each field... + Decimal res = new Decimal(); + res.setUnscaled(ByteBuffer.wrap(hiveDecimal.unscaledValue().toByteArray())); + res.setScale((short) hiveDecimal.scale()); + return res; + } + + private static HiveDecimal toHiveDecimal(Decimal decimal) { + return HiveDecimal.create(new BigInteger(decimal.getUnscaled()), decimal.getScale()); + } + public static int parsePositiveIntStat(Map<String, String> parameters, String key) { String value = parameters.get(key); if (value == null) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java index 88ef5f6..c0144ab 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java @@ -123,9 +123,11 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase { .field("third", DataTypes.BOOLEAN()) .field("fourth", DataTypes.DOUBLE()) .field("fifth", DataTypes.BIGINT()) - .field("sixth", DataTypes.BYTES()); + .field("sixth", DataTypes.BYTES()) + .field("seventh", DataTypes.DECIMAL(10, 3)) + .field("eighth", DataTypes.DECIMAL(30, 3)); if (supportDateStats) { - builder.field("seventh", DataTypes.DATE()); + builder.field("ninth", DataTypes.DATE()); } TableSchema tableSchema = builder.build(); CatalogTable catalogTable = new CatalogTableImpl(tableSchema, getBatchTableProperties(), TEST_COMMENT); @@ -137,8 +139,10 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase { columnStatisticsDataBaseMap.put("fourth", new CatalogColumnStatisticsDataDouble(15.02, 20.01, 3L, 10L)); columnStatisticsDataBaseMap.put("fifth", new CatalogColumnStatisticsDataLong(0L, 20L, 3L, 2L)); columnStatisticsDataBaseMap.put("sixth", new CatalogColumnStatisticsDataBinary(150L, 20D, 3L)); + columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDouble(1.23, 99.456, 100L, 0L)); + columnStatisticsDataBaseMap.put("eighth", new CatalogColumnStatisticsDataDouble(0.123, 123456.789, 5723L, 19L)); if (supportDateStats) { - columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDate( + columnStatisticsDataBaseMap.put("ninth", new CatalogColumnStatisticsDataDate( new Date(71L), new Date(17923L), 132L, 0L)); } CatalogColumnStatistics catalogColumnStatistics = new CatalogColumnStatistics(columnStatisticsDataBaseMap);