This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9a01d11 [FLINK-12875][hive] support converting input args of char,
varchar, bytes, timestamp, date for Hive functions
9a01d11 is described below
commit 9a01d1162dbe5fe97dcc89f4db502348c904d150
Author: bowen.li <[email protected]>
AuthorDate: Mon Jun 17 14:42:27 2019 -0700
[FLINK-12875][hive] support converting input args of char, varchar, bytes,
timestamp, date for Hive functions
This PR adds support for converting input args of char, varchar, bytes,
timestamp, date for Hive functions.
This closes #8769.
---
.../flink/table/functions/hive/HiveSimpleUDF.java | 6 +-
.../functions/hive/conversion/HiveInspectors.java | 119 +++++++++++++++++++--
.../table/functions/hive/HiveSimpleUDFTest.java | 98 ++++++++++++++++-
3 files changed, 210 insertions(+), 13 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index 98c6025..314f4a9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -87,7 +87,7 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
conversionHelper = new
GenericUDFUtils.ConversionHelper(method, argInspectors);
conversions = new
HiveObjectConversion[argInspectors.length];
for (int i = 0; i < argInspectors.length; i++) {
- conversions[i] =
HiveInspectors.getConversion(argInspectors[i]);
+ conversions[i] =
HiveInspectors.getConversion(argInspectors[i], argTypes[i]);
}
allIdentityConverter = Arrays.stream(conversions)
@@ -109,8 +109,8 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
}
try {
- return HiveInspectors.toFlinkObject(returnInspector,
- FunctionRegistry.invoke(method, function,
conversionHelper.convertIfNecessary(args)));
+ Object result = FunctionRegistry.invoke(method,
function, conversionHelper.convertIfNecessary(args));
+ return HiveInspectors.toFlinkObject(returnInspector,
result);
} catch (HiveException e) {
throw new FlinkHiveUDFException(e);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index a5d0c45..e9bae47 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -22,40 +22,62 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBinaryObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDateObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveCharObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaTimestampObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import java.sql.Date;
+import java.sql.Timestamp;
+
/**
* Util for any ObjectInspector related inspection and conversion of Hive data
to/from Flink data.
@@ -67,9 +89,9 @@ import org.apache.hadoop.io.Text;
public class HiveInspectors {
/**
- * Get conversion for converting Flink object to Hive object from an
ObjectInspector.
+ * Get conversion for converting Flink object to Hive object from an
ObjectInspector and the corresponding Flink DataType.
*/
- public static HiveObjectConversion getConversion(ObjectInspector
inspector) {
+ public static HiveObjectConversion getConversion(ObjectInspector
inspector, DataType dataType) {
if (inspector instanceof PrimitiveObjectInspector) {
if (inspector instanceof JavaBooleanObjectInspector) {
if (((JavaBooleanObjectInspector)
inspector).preferWritable()) {
@@ -119,9 +141,45 @@ public class HiveInspectors {
} else {
return IdentityConversion.INSTANCE;
}
+ } else if (inspector instanceof
JavaDateObjectInspector) {
+ if (((JavaDateObjectInspector)
inspector).preferWritable()) {
+ return o -> new DateWritable((Date) o);
+ } else {
+ return IdentityConversion.INSTANCE;
+ }
+ } else if (inspector instanceof
JavaTimestampObjectInspector) {
+ if (((JavaTimestampObjectInspector)
inspector).preferWritable()) {
+ return o -> new
TimestampWritable((Timestamp) o);
+ } else {
+ return IdentityConversion.INSTANCE;
+ }
+ } else if (inspector instanceof
JavaBinaryObjectInspector) {
+ if (((JavaBinaryObjectInspector)
inspector).preferWritable()) {
+ return o -> new BytesWritable((byte[])
o);
+ } else {
+ return IdentityConversion.INSTANCE;
+ }
+ } else if (inspector instanceof
JavaHiveCharObjectInspector) {
+ if (((JavaHiveCharObjectInspector)
inspector).preferWritable()) {
+ return o -> new HiveCharWritable(
+ new HiveChar((String) o,
((CharType) dataType.getLogicalType()).getLength()));
+ } else {
+ return o -> new HiveChar((String) o,
((CharType) dataType.getLogicalType()).getLength());
+ }
+ } else if (inspector instanceof
JavaHiveVarcharObjectInspector) {
+ if (((JavaHiveVarcharObjectInspector)
inspector).preferWritable()) {
+ return o -> new HiveVarcharWritable(
+ new HiveVarchar((String) o,
((VarCharType) dataType.getLogicalType()).getLength()));
+ } else {
+ return o -> new HiveVarchar((String) o,
((VarCharType) dataType.getLogicalType()).getLength());
+ }
}
+
+ // TODO: handle decimal type
}
+ // TODO: handle complex types like struct, list, and map
+
throw new FlinkHiveUDFException(
String.format("Flink doesn't support convert object
conversion for %s yet", inspector));
}
@@ -162,34 +220,64 @@ public class HiveInspectors {
return oi.preferWritable() ?
oi.get(data) :
- oi.getPrimitiveWritableObject(data);
+ oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof IntObjectInspector) {
IntObjectInspector oi = (IntObjectInspector)
inspector;
return oi.preferWritable() ?
oi.get(data) :
- oi.getPrimitiveWritableObject(data);
+ oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof LongObjectInspector) {
LongObjectInspector oi = (LongObjectInspector)
inspector;
return oi.preferWritable() ?
oi.get(data) :
- oi.getPrimitiveWritableObject(data);
+ oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof FloatObjectInspector) {
FloatObjectInspector oi =
(FloatObjectInspector) inspector;
return oi.preferWritable() ?
oi.get(data) :
- oi.getPrimitiveWritableObject(data);
+ oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof DoubleObjectInspector) {
DoubleObjectInspector oi =
(DoubleObjectInspector) inspector;
return oi.preferWritable() ?
oi.get(data) :
- oi.getPrimitiveWritableObject(data);
+ oi.getPrimitiveJavaObject(data);
+ } else if (inspector instanceof DateObjectInspector) {
+ DateObjectInspector oi = (DateObjectInspector)
inspector;
+
+ return oi.preferWritable() ?
+
oi.getPrimitiveWritableObject(data).get() :
+ oi.getPrimitiveJavaObject(data);
+ } else if (inspector instanceof
TimestampObjectInspector) {
+ TimestampObjectInspector oi =
(TimestampObjectInspector) inspector;
+
+ return oi.preferWritable() ?
+
oi.getPrimitiveWritableObject(data).getTimestamp() :
+ oi.getPrimitiveJavaObject(data);
+ } else if (inspector instanceof BinaryObjectInspector) {
+ BinaryObjectInspector oi =
(BinaryObjectInspector) inspector;
+
+ return oi.preferWritable() ?
+
oi.getPrimitiveWritableObject(data).getBytes() :
+ oi.getPrimitiveJavaObject(data);
+ } else if (inspector instanceof
HiveCharObjectInspector) {
+ HiveCharObjectInspector oi =
(HiveCharObjectInspector) inspector;
+
+ return oi.preferWritable() ?
+
oi.getPrimitiveWritableObject(data).getHiveChar().getValue() :
+
oi.getPrimitiveJavaObject(data).getValue();
+ } else if (inspector instanceof
HiveVarcharObjectInspector) {
+ HiveVarcharObjectInspector oi =
(HiveVarcharObjectInspector) inspector;
+
+ return oi.preferWritable() ?
+
oi.getPrimitiveWritableObject(data).getHiveVarchar().getValue() :
+
oi.getPrimitiveJavaObject(data).getValue();
}
- // TODO: handle more primitive types like char,
varchar, timestamp, date, decimal
+ // TODO: handle decimal type
}
// TODO: handle complex types like struct, list, and map
@@ -225,6 +313,21 @@ public class HiveInspectors {
} else if (clazz.equals(Double.class) ||
clazz.equals(DoubleWritable.class)) {
typeInfo = TypeInfoFactory.doubleTypeInfo;
+ } else if (clazz.equals(Date.class) ||
clazz.equals(DateWritable.class)) {
+
+ typeInfo = TypeInfoFactory.dateTypeInfo;
+ } else if (clazz.equals(Timestamp.class) ||
clazz.equals(TimestampWritable.class)) {
+
+ typeInfo = TypeInfoFactory.timestampTypeInfo;
+ } else if (clazz.equals(byte[].class) ||
clazz.equals(BytesWritable.class)) {
+
+ typeInfo = TypeInfoFactory.binaryTypeInfo;
+ } else if (clazz.equals(HiveChar.class)) {
+
+ typeInfo = TypeInfoFactory.charTypeInfo;
+ } else if (clazz.equals(HiveVarchar.class)) {
+
+ typeInfo = TypeInfoFactory.varcharTypeInfo;
} else {
throw new FlinkHiveUDFException(
String.format("Class %s is not supported yet",
clazz.getName()));
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index 25c331f..afb3341 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -21,13 +21,21 @@ package org.apache.flink.table.functions.hive;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.hive.ql.udf.UDFBase64;
import org.apache.hadoop.hive.ql.udf.UDFBin;
+import org.apache.hadoop.hive.ql.udf.UDFConv;
import org.apache.hadoop.hive.ql.udf.UDFJson;
import org.apache.hadoop.hive.ql.udf.UDFMinute;
import org.apache.hadoop.hive.ql.udf.UDFRand;
+import org.apache.hadoop.hive.ql.udf.UDFRegExpExtract;
+import org.apache.hadoop.hive.ql.udf.UDFUnhex;
import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
import org.junit.Test;
+import java.io.UnsupportedEncodingException;
+import java.sql.Date;
+import java.sql.Timestamp;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -35,7 +43,6 @@ import static org.junit.Assert.assertTrue;
* Test for {@link HiveSimpleUDF}.
*/
public class HiveSimpleUDFTest {
-
@Test
public void testUDFRand() {
HiveSimpleUDF udf = init(UDFRand.class, new DataType[0]);
@@ -53,7 +60,25 @@ public class HiveSimpleUDFTest {
}
@Test
+ public void testUDFConv() {
+ HiveSimpleUDF udf = init(
+ UDFConv.class,
+ new DataType[]{
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.INT()
+ });
+
+ assertEquals("1", udf.eval("12", 2, 10));
+ assertEquals("-16", udf.eval(-10, 16, -10));
+ }
+
+ @Test
public void testUDFJson() {
+ String pattern = "$.owner";
+ String json = "{\"store\": \"test\", \"owner\": \"amy\"}";
+ String expected = "amy";
+
HiveSimpleUDF udf = init(
UDFJson.class,
new DataType[]{
@@ -61,7 +86,36 @@ public class HiveSimpleUDFTest {
DataTypes.STRING()
});
- assertEquals("amy", udf.eval("{\"store\": \"test\", \"owner\":
\"amy\"}", "$.owner"));
+ assertEquals(expected, udf.eval(json, pattern));
+
+ udf = init(
+ UDFJson.class,
+ new DataType[]{
+ DataTypes.CHAR(100),
+ DataTypes.CHAR(pattern.length())
+ });
+
+ assertEquals(expected, udf.eval(json, pattern));
+
+ udf = init(
+ UDFJson.class,
+ new DataType[]{
+ DataTypes.VARCHAR(100),
+ DataTypes.VARCHAR(pattern.length())
+ });
+
+ assertEquals(expected, udf.eval(json, pattern));
+
+ // Test invalid CHAR length
+ udf = init(
+ UDFJson.class,
+ new DataType[]{
+ DataTypes.CHAR(100),
+ DataTypes.CHAR(pattern.length() - 1) // shorter
than pattern's length by 1
+ });
+
+ // Cannot find path "$.owne"
+ assertEquals(null, udf.eval(json, pattern));
}
@Test
@@ -73,6 +127,8 @@ public class HiveSimpleUDFTest {
});
assertEquals(17, udf.eval("1969-07-20 20:17:40"));
+ assertEquals(17, udf.eval(Timestamp.valueOf("1969-07-20
20:17:40")));
+ assertEquals(58, udf.eval("12:58:59"));
}
@Test
@@ -84,6 +140,44 @@ public class HiveSimpleUDFTest {
});
assertEquals(29, udf.eval("1969-07-20"));
+ assertEquals(29, udf.eval(Date.valueOf("1969-07-20")));
+ assertEquals(29, udf.eval(Timestamp.valueOf("1969-07-20
00:00:00")));
+ assertEquals(1, udf.eval("1980-12-31 12:59:59"));
+ }
+
+ @Test
+ public void testUDFRegExpExtract() {
+ HiveSimpleUDF udf = init(
+ UDFRegExpExtract.class,
+ new DataType[]{
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT()
+ });
+
+ assertEquals("100", udf.eval("100-200", "(\\d+)-(\\d+)", 1));
+ }
+
+ @Test
+ public void testUDFUnbase64() {
+ HiveSimpleUDF udf = init(
+ UDFBase64.class,
+ new DataType[]{
+ DataTypes.BYTES()
+ });
+
+ assertEquals("Cg==", udf.eval(new byte[] {10}));
+ }
+
+ @Test
+ public void testUDFUnhex() throws UnsupportedEncodingException {
+ HiveSimpleUDF udf = init(
+ UDFUnhex.class,
+ new DataType[]{
+ DataTypes.STRING()
+ });
+
+ assertEquals("MySQL", new String((byte[])
udf.eval("4D7953514C"), "UTF-8"));
}
private HiveSimpleUDF init(Class hiveUdfClass, DataType[] argTypes) {