This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 88e416f  [FLINK-18046][hive] Decimal column stats not supported for 
Hive table
88e416f is described below

commit 88e416f988bc286b7cc0df7bcf0679184e7bb805
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jun 8 18:41:11 2020 +0800

    [FLINK-18046][hive] Decimal column stats not supported for Hive table
    
    
    This closes #12424
---
 .../table/catalog/hive/util/HiveStatsUtil.java     | 51 ++++++++++++++++++++++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 10 +++--
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 9fd4394..d4ae6d2 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -31,12 +31,15 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -50,6 +53,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -171,6 +177,20 @@ public class HiveStatsUtil {
                                        stringStats.isSetAvgColLen() ? 
stringStats.getAvgColLen() : null,
                                        stringStats.isSetNumDVs() ? 
stringStats.getNumDVs() : null,
                                        stringStats.isSetNumDVs() ? 
stringStats.getNumNulls() : null);
+               } else if (stats.isSetDecimalStats()) {
+                       DecimalColumnStatsData decimalStats = 
stats.getDecimalStats();
+                       // for now, just return 
CatalogColumnStatisticsDataDouble for decimal columns
+                       Double max = null;
+                       if (decimalStats.isSetHighValue()) {
+                               max = 
toHiveDecimal(decimalStats.getHighValue()).doubleValue();
+                       }
+                       Double min = null;
+                       if (decimalStats.isSetLowValue()) {
+                               min = 
toHiveDecimal(decimalStats.getLowValue()).doubleValue();
+                       }
+                       Long ndv = decimalStats.isSetNumDVs() ? 
decimalStats.getNumDVs() : null;
+                       Long nullCount = decimalStats.isSetNumNulls() ? 
decimalStats.getNumNulls() : null;
+                       return new CatalogColumnStatisticsDataDouble(min, max, 
ndv, nullCount);
                } else {
                        LOG.warn("Flink does not support converting 
ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType);
                        return null;
@@ -288,11 +308,42 @@ public class HiveStatsUtil {
                                }
                                return 
ColumnStatisticsData.binaryStats(hiveBinaryColumnStats);
                        }
+               } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
+                       if (colStat instanceof 
CatalogColumnStatisticsDataDouble) {
+                               CatalogColumnStatisticsDataDouble flinkStats = 
(CatalogColumnStatisticsDataDouble) colStat;
+                               DecimalColumnStatsData hiveStats = new 
DecimalColumnStatsData();
+                               if (flinkStats.getMax() != null) {
+                                       // in older versions we cannot create 
HiveDecimal from Double, so convert Double to BigDecimal first
+                                       
hiveStats.setHighValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMax()))));
+                               }
+                               if (flinkStats.getMin() != null) {
+                                       
hiveStats.setLowValue(toThriftDecimal(HiveDecimal.create(BigDecimal.valueOf(flinkStats.getMin()))));
+                               }
+                               if (flinkStats.getNdv() != null) {
+                                       
hiveStats.setNumDVs(flinkStats.getNdv());
+                               }
+                               if (flinkStats.getNullCount() != null) {
+                                       
hiveStats.setNumNulls(flinkStats.getNullCount());
+                               }
+                               return 
ColumnStatisticsData.decimalStats(hiveStats);
+                       }
                }
                throw new CatalogException(String.format("Flink does not 
support converting ColumnStats '%s' for Hive column " +
                                                                                
                "type '%s' yet", colStat, colType));
        }
 
+       private static Decimal toThriftDecimal(HiveDecimal hiveDecimal) {
+               // the constructor signature changed in 3.x. use default 
constructor and set each field...
+               Decimal res = new Decimal();
+               
res.setUnscaled(ByteBuffer.wrap(hiveDecimal.unscaledValue().toByteArray()));
+               res.setScale((short) hiveDecimal.scale());
+               return res;
+       }
+
+       private static HiveDecimal toHiveDecimal(Decimal decimal) {
+               return HiveDecimal.create(new 
BigInteger(decimal.getUnscaled()), decimal.getScale());
+       }
+
        public static int parsePositiveIntStat(Map<String, String> parameters, 
String key) {
                String value = parameters.get(key);
                if (value == null) {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 88ef5f6..c0144ab 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -123,9 +123,11 @@ public class HiveCatalogHiveMetadataTest extends 
HiveCatalogMetadataTestBase {
                                .field("third", DataTypes.BOOLEAN())
                                .field("fourth", DataTypes.DOUBLE())
                                .field("fifth", DataTypes.BIGINT())
-                               .field("sixth", DataTypes.BYTES());
+                               .field("sixth", DataTypes.BYTES())
+                               .field("seventh", DataTypes.DECIMAL(10, 3))
+                               .field("eighth", DataTypes.DECIMAL(30, 3));
                if (supportDateStats) {
-                       builder.field("seventh", DataTypes.DATE());
+                       builder.field("ninth", DataTypes.DATE());
                }
                TableSchema tableSchema = builder.build();
                CatalogTable catalogTable = new CatalogTableImpl(tableSchema, 
getBatchTableProperties(), TEST_COMMENT);
@@ -137,8 +139,10 @@ public class HiveCatalogHiveMetadataTest extends 
HiveCatalogMetadataTestBase {
                columnStatisticsDataBaseMap.put("fourth", new 
CatalogColumnStatisticsDataDouble(15.02, 20.01, 3L, 10L));
                columnStatisticsDataBaseMap.put("fifth", new 
CatalogColumnStatisticsDataLong(0L, 20L, 3L, 2L));
                columnStatisticsDataBaseMap.put("sixth", new 
CatalogColumnStatisticsDataBinary(150L, 20D, 3L));
+               columnStatisticsDataBaseMap.put("seventh", new 
CatalogColumnStatisticsDataDouble(1.23, 99.456, 100L, 0L));
+               columnStatisticsDataBaseMap.put("eighth", new 
CatalogColumnStatisticsDataDouble(0.123, 123456.789, 5723L, 19L));
                if (supportDateStats) {
-                       columnStatisticsDataBaseMap.put("seventh", new 
CatalogColumnStatisticsDataDate(
+                       columnStatisticsDataBaseMap.put("ninth", new 
CatalogColumnStatisticsDataDate(
                                        new Date(71L), new Date(17923L), 132L, 
0L));
                }
                CatalogColumnStatistics catalogColumnStatistics = new 
CatalogColumnStatistics(columnStatisticsDataBaseMap);

Reply via email to