This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 64bb08d [FLINK-13048][hive] support decimal in Flink's integration
with Hive user defined functions
64bb08d is described below
commit 64bb08d776ea4d153a74bbe992b59e52c3044425
Author: bowen.li <[email protected]>
AuthorDate: Mon Jul 1 16:29:36 2019 -0700
[FLINK-13048][hive] support decimal in Flink's integration with Hive user
defined functions
This PR adds support for decimal in Flink's integration with Hive user
defined functions.
This closes #8941.
---
.../functions/hive/conversion/HiveInspectors.java | 17 ++++++++++-----
.../table/functions/hive/HiveGenericUDAFTest.java | 21 +++++++++++++++++++
.../table/functions/hive/HiveGenericUDFTest.java | 24 +++++++++++-----------
.../table/functions/hive/HiveSimpleUDFTest.java | 13 ++++++++++++
4 files changed, 58 insertions(+), 17 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index bbd0b55..0f9dd67 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
@@ -58,6 +59,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspect
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector;
@@ -94,6 +96,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -165,7 +168,6 @@ public class HiveInspectors {
case TIMESTAMP:
return new
JavaConstantTimestampObjectInspector((Timestamp) value);
case DECIMAL:
- // TODO: Needs more testing
return new
JavaConstantHiveDecimalObjectInspector((HiveDecimal) value);
case BINARY:
return new
JavaConstantBinaryObjectInspector((byte[]) value);
@@ -200,9 +202,9 @@ public class HiveInspectors {
return o -> new HiveChar((String) o,
((CharType) dataType).getLength());
} else if (inspector instanceof
HiveVarcharObjectInspector) {
return o -> new HiveVarchar((String) o,
((VarCharType) dataType).getLength());
+ } else if (inspector instanceof
HiveDecimalObjectInspector) {
+ return o -> HiveDecimal.create((BigDecimal) o);
}
-
- // TODO: handle decimal type
}
if (inspector instanceof ListObjectInspector) {
@@ -299,9 +301,11 @@ public class HiveInspectors {
HiveVarcharObjectInspector oi =
(HiveVarcharObjectInspector) inspector;
return
oi.getPrimitiveJavaObject(data).getValue();
- }
+ } else if (inspector instanceof
HiveDecimalObjectInspector) {
+ HiveDecimalObjectInspector oi =
(HiveDecimalObjectInspector) inspector;
- // TODO: handle decimal type
+ return
oi.getPrimitiveJavaObject(data).bigDecimalValue();
+ }
}
if (inspector instanceof ListObjectInspector) {
@@ -392,6 +396,9 @@ public class HiveInspectors {
} else if (clazz.equals(HiveVarchar.class) ||
clazz.equals(HiveVarcharWritable.class)) {
typeInfo = TypeInfoFactory.varcharTypeInfo;
+ } else if (clazz.equals(HiveDecimal.class) ||
clazz.equals(HiveDecimalWritable.class)) {
+
+ typeInfo = TypeInfoFactory.decimalTypeInfo;
} else {
throw new FlinkHiveUDFException(
String.format("Class %s is not supported yet",
clazz.getName()));
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
index 94dd0fd..df09107 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -28,6 +28,7 @@ import
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
import org.junit.Test;
+import java.math.BigDecimal;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@@ -80,6 +81,26 @@ public class HiveGenericUDAFTest {
udf.merge(acc, Arrays.asList());
assertEquals(6.1d, udf.getValue(acc));
+
+ constantArgs = new Object[] {
+ null
+ };
+
+ argTypes = new DataType[] {
+ DataTypes.DECIMAL(5, 3)
+ };
+
+ udf = init(GenericUDAFSum.class, constantArgs, argTypes);
+
+ acc = udf.createAccumulator();
+
+ udf.accumulate(acc, BigDecimal.valueOf(10.111));
+ udf.accumulate(acc, BigDecimal.valueOf(3.222));
+ udf.accumulate(acc, BigDecimal.valueOf(5.333));
+
+ udf.merge(acc, Arrays.asList());
+
+ assertEquals(BigDecimal.valueOf(18.666), udf.getValue(acc));
}
@Test
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index 879e64f..058b5c4 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hive.ql.udf.generic.GenericUDFStringToMap;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
import org.junit.Test;
+import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.HashMap;
@@ -201,18 +202,17 @@ public class HiveGenericUDFTest {
assertEquals(0L, udf.eval(-0.1d));
- // TODO: reenable the test when we support decimal for Hive
functions
-// udf = init(
-// GenericUDFCeil.class,
-// new Object[] {
-// null
-// },
-// new DataType[] {
-// DataTypes.DECIMAL(1, 1)
-// }
-// );
-//
-// assertEquals(0L, udf.eval(BigDecimal.valueOf(-0.1)));
+ udf = init(
+ GenericUDFCeil.class,
+ new Object[] {
+ null
+ },
+ new DataType[] {
+ DataTypes.DECIMAL(2, 1)
+ }
+ );
+
+ assertEquals(BigDecimal.valueOf(4),
udf.eval(BigDecimal.valueOf(3.1d)));
}
@Test
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index a281a83..4bbe0d0 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -29,11 +29,13 @@ import org.apache.hadoop.hive.ql.udf.UDFJson;
import org.apache.hadoop.hive.ql.udf.UDFMinute;
import org.apache.hadoop.hive.ql.udf.UDFRand;
import org.apache.hadoop.hive.ql.udf.UDFRegExpExtract;
+import org.apache.hadoop.hive.ql.udf.UDFToInteger;
import org.apache.hadoop.hive.ql.udf.UDFUnhex;
import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
@@ -182,6 +184,17 @@ public class HiveSimpleUDFTest {
}
@Test
+ public void testUDFToInteger() {
+ HiveSimpleUDF udf = init(
+ UDFToInteger.class,
+ new DataType[]{
+ DataTypes.DECIMAL(5, 3)
+ });
+
+ assertEquals(1, udf.eval(BigDecimal.valueOf(1.1d)));
+ }
+
+ @Test
public void testUDFArray_singleArray() {
Double[] testInputs = new Double[] { 1.1d, 2.2d };