This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bb08d  [FLINK-13048][hive] support decimal in Flink's integration 
with Hive user defined functions
64bb08d is described below

commit 64bb08d776ea4d153a74bbe992b59e52c3044425
Author: bowen.li <[email protected]>
AuthorDate: Mon Jul 1 16:29:36 2019 -0700

    [FLINK-13048][hive] support decimal in Flink's integration with Hive user 
defined functions
    
    This PR adds support for decimal in Flink's integration with Hive user 
defined functions.
    
    This closes #8941.
---
 .../functions/hive/conversion/HiveInspectors.java  | 17 ++++++++++-----
 .../table/functions/hive/HiveGenericUDAFTest.java  | 21 +++++++++++++++++++
 .../table/functions/hive/HiveGenericUDFTest.java   | 24 +++++++++++-----------
 .../table/functions/hive/HiveSimpleUDFTest.java    | 13 ++++++++++++
 4 files changed, 58 insertions(+), 17 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index bbd0b55..0f9dd67 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
@@ -58,6 +59,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspect
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector;
@@ -94,6 +96,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
+import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -165,7 +168,6 @@ public class HiveInspectors {
                        case TIMESTAMP:
                                return new 
JavaConstantTimestampObjectInspector((Timestamp) value);
                        case DECIMAL:
-                               // TODO: Needs more testing
                                return new 
JavaConstantHiveDecimalObjectInspector((HiveDecimal) value);
                        case BINARY:
                                return new 
JavaConstantBinaryObjectInspector((byte[]) value);
@@ -200,9 +202,9 @@ public class HiveInspectors {
                                return o -> new HiveChar((String) o, 
((CharType) dataType).getLength());
                        } else if (inspector instanceof 
HiveVarcharObjectInspector) {
                                return o -> new HiveVarchar((String) o, 
((VarCharType) dataType).getLength());
+                       } else if (inspector instanceof 
HiveDecimalObjectInspector) {
+                               return o -> HiveDecimal.create((BigDecimal) o);
                        }
-
-                       // TODO: handle decimal type
                }
 
                if (inspector instanceof ListObjectInspector) {
@@ -299,9 +301,11 @@ public class HiveInspectors {
                                HiveVarcharObjectInspector oi = 
(HiveVarcharObjectInspector) inspector;
 
                                return 
oi.getPrimitiveJavaObject(data).getValue();
-                       }
+                       } else if (inspector instanceof 
HiveDecimalObjectInspector) {
+                               HiveDecimalObjectInspector oi = 
(HiveDecimalObjectInspector) inspector;
 
-                       // TODO: handle decimal type
+                               return 
oi.getPrimitiveJavaObject(data).bigDecimalValue();
+                       }
                }
 
                if (inspector instanceof ListObjectInspector) {
@@ -392,6 +396,9 @@ public class HiveInspectors {
                } else if (clazz.equals(HiveVarchar.class) || 
clazz.equals(HiveVarcharWritable.class)) {
 
                        typeInfo = TypeInfoFactory.varcharTypeInfo;
+               } else if (clazz.equals(HiveDecimal.class) || 
clazz.equals(HiveDecimalWritable.class)) {
+
+                       typeInfo = TypeInfoFactory.decimalTypeInfo;
                } else {
                        throw new FlinkHiveUDFException(
                                String.format("Class %s is not supported yet", 
clazz.getName()));
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
index 94dd0fd..df09107 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
@@ -80,6 +81,26 @@ public class HiveGenericUDAFTest {
                udf.merge(acc, Arrays.asList());
 
                assertEquals(6.1d, udf.getValue(acc));
+
+               constantArgs = new Object[] {
+                       null
+               };
+
+               argTypes = new DataType[] {
+                       DataTypes.DECIMAL(5, 3)
+               };
+
+               udf = init(GenericUDAFSum.class, constantArgs, argTypes);
+
+               acc = udf.createAccumulator();
+
+               udf.accumulate(acc, BigDecimal.valueOf(10.111));
+               udf.accumulate(acc, BigDecimal.valueOf(3.222));
+               udf.accumulate(acc, BigDecimal.valueOf(5.333));
+
+               udf.merge(acc, Arrays.asList());
+
+               assertEquals(BigDecimal.valueOf(18.666), udf.getValue(acc));
        }
 
        @Test
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index 879e64f..058b5c4 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFStringToMap;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.HashMap;
@@ -201,18 +202,17 @@ public class HiveGenericUDFTest {
 
                assertEquals(0L, udf.eval(-0.1d));
 
-               // TODO: reenable the test when we support decimal for Hive 
functions
-//             udf = init(
-//                     GenericUDFCeil.class,
-//                     new Object[] {
-//                             null
-//                     },
-//                     new DataType[] {
-//                             DataTypes.DECIMAL(1, 1)
-//                     }
-//             );
-//
-//             assertEquals(0L, udf.eval(BigDecimal.valueOf(-0.1)));
+               udf = init(
+                       GenericUDFCeil.class,
+                       new Object[] {
+                               null
+                       },
+                       new DataType[] {
+                               DataTypes.DECIMAL(2, 1)
+                       }
+               );
+
+               assertEquals(BigDecimal.valueOf(4), 
udf.eval(BigDecimal.valueOf(3.1d)));
        }
 
        @Test
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index a281a83..4bbe0d0 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -29,11 +29,13 @@ import org.apache.hadoop.hive.ql.udf.UDFJson;
 import org.apache.hadoop.hive.ql.udf.UDFMinute;
 import org.apache.hadoop.hive.ql.udf.UDFRand;
 import org.apache.hadoop.hive.ql.udf.UDFRegExpExtract;
+import org.apache.hadoop.hive.ql.udf.UDFToInteger;
 import org.apache.hadoop.hive.ql.udf.UDFUnhex;
 import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
 import org.junit.Test;
 
 import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
 
@@ -182,6 +184,17 @@ public class HiveSimpleUDFTest {
        }
 
        @Test
+       public void testUDFToInteger() {
+               HiveSimpleUDF udf = init(
+                       UDFToInteger.class,
+                       new DataType[]{
+                               DataTypes.DECIMAL(5, 3)
+                       });
+
+               assertEquals(1, udf.eval(BigDecimal.valueOf(1.1d)));
+       }
+
+       @Test
        public void testUDFArray_singleArray() {
                Double[] testInputs = new Double[] { 1.1d, 2.2d };
 

Reply via email to