This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e850398 [FLINK-17334][hive] Flink does not support HIVE UDFs with
primitive return types
e850398 is described below
commit e8503986132f0ffaeec91caf5da6ece2d0eb70d3
Author: RoyRuan <[email protected]>
AuthorDate: Wed Apr 29 10:51:45 2020 +0800
[FLINK-17334][hive] Flink does not support HIVE UDFs with primitive return
types
This closes #11876
---
.../flink/table/functions/hive/HiveSimpleUDF.java | 5 +-
.../functions/hive/conversion/HiveInspectors.java | 68 ----------------------
.../table/functions/hive/HiveSimpleUDFTest.java | 61 +++++++++++++++++++
3 files changed, 63 insertions(+), 71 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index 97f9a45..e6bb663 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -126,11 +126,10 @@ public class HiveSimpleUDF extends
HiveScalarFunction<UDF> {
for (DataType argType : argTypes) {
argTypeInfo.add(HiveTypeUtil.toHiveTypeInfo(argType, false));
}
- Class returnType = hiveFunctionWrapper.createFunction()
-
.getResolver().getEvalMethod(argTypeInfo).getReturnType();
+ Method evalMethod =
hiveFunctionWrapper.createFunction().getResolver().getEvalMethod(argTypeInfo);
return HiveTypeUtil.toFlinkType(
- HiveInspectors.getObjectInspector(hiveShim,
returnType));
+
ObjectInspectorFactory.getReflectionObjectInspector(evalMethod.getGenericReturnType(),
ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
} catch (UDFArgumentException e) {
throw new FlinkHiveUDFException(e);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index c9d4ffd..3bc313e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -36,14 +36,6 @@ import org.apache.flink.types.Row;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -89,15 +81,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import javax.annotation.Nonnull;
@@ -348,59 +333,6 @@ public class HiveInspectors {
String.format("Unwrap does not support ObjectInspector
'%s' yet", inspector));
}
- public static ObjectInspector getObjectInspector(HiveShim hiveShim,
Class clazz) {
- TypeInfo typeInfo;
-
- if (clazz.equals(String.class) || clazz.equals(Text.class)) {
-
- typeInfo = TypeInfoFactory.stringTypeInfo;
- } else if (clazz.equals(Boolean.class) ||
clazz.equals(BooleanWritable.class)) {
-
- typeInfo = TypeInfoFactory.booleanTypeInfo;
- } else if (clazz.equals(Byte.class) ||
clazz.equals(ByteWritable.class)) {
-
- typeInfo = TypeInfoFactory.byteTypeInfo;
- } else if (clazz.equals(Short.class) ||
clazz.equals(ShortWritable.class)) {
-
- typeInfo = TypeInfoFactory.shortTypeInfo;
- } else if (clazz.equals(Integer.class) ||
clazz.equals(IntWritable.class)) {
-
- typeInfo = TypeInfoFactory.intTypeInfo;
- } else if (clazz.equals(Long.class) ||
clazz.equals(LongWritable.class)) {
-
- typeInfo = TypeInfoFactory.longTypeInfo;
- } else if (clazz.equals(Float.class) ||
clazz.equals(FloatWritable.class)) {
-
- typeInfo = TypeInfoFactory.floatTypeInfo;
- } else if (clazz.equals(Double.class) ||
clazz.equals(DoubleWritable.class)) {
-
- typeInfo = TypeInfoFactory.doubleTypeInfo;
- } else if (clazz.equals(hiveShim.getDateDataTypeClass()) ||
clazz.equals(DateWritable.class)) {
-
- typeInfo = TypeInfoFactory.dateTypeInfo;
- } else if (clazz.equals(hiveShim.getTimestampDataTypeClass())
|| clazz.equals(TimestampWritable.class)) {
-
- typeInfo = TypeInfoFactory.timestampTypeInfo;
- } else if (clazz.equals(byte[].class) ||
clazz.equals(BytesWritable.class)) {
-
- typeInfo = TypeInfoFactory.binaryTypeInfo;
- } else if (clazz.equals(HiveChar.class) ||
clazz.equals(HiveCharWritable.class)) {
-
- typeInfo = TypeInfoFactory.charTypeInfo;
- } else if (clazz.equals(HiveVarchar.class) ||
clazz.equals(HiveVarcharWritable.class)) {
-
- typeInfo = TypeInfoFactory.varcharTypeInfo;
- } else if (clazz.equals(HiveDecimal.class) ||
clazz.equals(HiveDecimalWritable.class)) {
-
- typeInfo = TypeInfoFactory.decimalTypeInfo;
- } else {
- throw new FlinkHiveUDFException(
- String.format("Class %s is not supported yet",
clazz.getName()));
- }
-
- return getObjectInspector(typeInfo);
- }
-
/**
* Get Hive {@link ObjectInspector} for a Flink {@link DataType}.
*/
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index 49cf403..c786613 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.functions.hive.util.TestHiveUDFArray;
import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFBase64;
import org.apache.hadoop.hive.ql.udf.UDFBin;
import org.apache.hadoop.hive.ql.udf.UDFConv;
@@ -50,6 +51,30 @@ public class HiveSimpleUDFTest {
private static HiveShim hiveShim =
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
@Test
+ public void testBooleanUDF() {
+ HiveSimpleUDF udf = init(BooleanUDF.class, new DataType[]{
DataTypes.INT()});
+ assertTrue((boolean) udf.eval(1));
+ }
+
+ @Test
+ public void testFloatUDF() {
+ HiveSimpleUDF udf = init(FloatUDF.class, new DataType[]{
DataTypes.FLOAT()});
+ assertEquals(3.0f, (float) udf.eval(3.0f), 0);
+ }
+
+ @Test
+ public void testIntUDF() {
+ HiveSimpleUDF udf = init(IntUDF.class, new DataType[]{
DataTypes.INT()});
+ assertEquals(3, (int) udf.eval(3));
+ }
+
+ @Test
+ public void testStringUDF() {
+ HiveSimpleUDF udf = init(StringUDF.class, new DataType[]{
DataTypes.STRING()});
+ assertEquals("test", udf.eval("test"));
+ }
+
+ @Test
public void testUDFRand() {
HiveSimpleUDF udf = init(UDFRand.class, new DataType[0]);
@@ -230,4 +255,40 @@ public class HiveSimpleUDFTest {
return udf;
}
+
+ /**
+ * Boolean Test UDF.
+ */
+ public static class BooleanUDF extends UDF {
+ public boolean evaluate(int content) {
+ return content == 1;
+ }
+ }
+
+ /**
+ * Float Test UDF.
+ */
+ public static class FloatUDF extends UDF {
+ public float evaluate(float content) {
+ return content;
+ }
+ }
+
+ /**
+ * Int Test UDF.
+ */
+ public static class IntUDF extends UDF {
+ public int evaluate(int content) {
+ return content;
+ }
+ }
+
+ /**
+ * String Test UDF.
+ */
+ public static class StringUDF extends UDF {
+ public String evaluate(String content) {
+ return content;
+ }
+ }
}