This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d561a6f [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4 d561a6f is described below commit d561a6f8ef7c494c575fbe179f41614a7d73ca43 Author: Rui Li <li...@apache.org> AuthorDate: Fri Jul 5 17:30:17 2019 +0800 [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4 This PR adds shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4. This closes #9001. --- .../apache/flink/table/catalog/hive/client/HiveShim.java | 8 ++++++++ .../flink/table/catalog/hive/client/HiveShimV1.java | 14 ++++++++++++++ .../flink/table/catalog/hive/client/HiveShimV2.java | 14 ++++++++++++++ .../flink/table/functions/hive/HiveGenericUDAF.java | 15 ++++++++++----- .../table/functions/hive/conversion/HiveInspectors.java | 4 ++++ .../flink/table/functions/hive/HiveGenericUDAFTest.java | 3 ++- 6 files changed, 52 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index 320f078..812ece1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.thrift.TException; import java.io.IOException; @@ -97,4 +99,10 @@ public interface HiveShim { */ void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException; + + /** + * Creates SimpleGenericUDAFParameterInfo. + */ + SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, + boolean distinct, boolean allColumns); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java index 830d6e6..8733100 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java @@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.thrift.TException; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -105,4 +108,15 @@ public class HiveShimV1 implements HiveShim { table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true"); client.alter_table(databaseName, tableName, table); } + + @Override + public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) { + try { + Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class, + boolean.class, boolean.class); + return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, distinct, allColumns); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e); + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java index 7df0aaf..2510497 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java @@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.thrift.TException; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; @@ -95,4 +98,15 @@ public class HiveShimV2 implements HiveShim { // For Hive-2.3.4, we don't need to tell HMS not to update stats. client.alter_table(databaseName, tableName, table); } + + @Override + public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) { + try { + Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class, + boolean.class, boolean.class, boolean.class); + return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, isWindowing, distinct, allColumns); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e); + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java index 163fc06..5782455 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java @@ -20,6 +20,8 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.FunctionContext; @@ -35,7 +37,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2; -import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import java.util.Arrays; @@ -62,13 +63,16 @@ public class HiveGenericUDAF private transient boolean allIdentityConverter; private transient boolean initialized; - public HiveGenericUDAF(HiveFunctionWrapper funcWrapper) { - this(funcWrapper, false); + private final String hiveVersion; + + public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, String hiveVersion) { + this(funcWrapper, false, hiveVersion); } - public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean isUDAFBridgeRequired) { + public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean isUDAFBridgeRequired, String hiveVersion) { this.hiveFunctionWrapper = funcWrapper; this.isUDAFBridgeRequired = isUDAFBridgeRequired; + this.hiveVersion = hiveVersion; } @Override @@ -112,8 +116,9 @@ public class HiveGenericUDAF resolver = (GenericUDAFResolver2) hiveFunctionWrapper.createFunction(); } + HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); return resolver.getEvaluator( - new SimpleGenericUDAFParameterInfo( + hiveShim.createUDAFParameterInfo( inputInspectors, // The flag to indicate if the UDAF invocation was from the windowing function call or not. // TODO: investigate whether this has impact on Flink streaming job with windows diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index 0f9dd67..85ca033 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -338,6 +338,10 @@ public class HiveInspectors { List<? extends StructField> fields = structInspector.getAllStructFieldRefs(); Row row = new Row(fields.size()); + // StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or list as data + if (!data.getClass().isArray() && !(data instanceof List)) { + data = new Object[]{data}; + } for (int i = 0; i < row.getArity(); i++) { row.setField( i, diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java index df09107..2945523 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.types.DataType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount; @@ -129,7 +130,7 @@ public class HiveGenericUDAFTest { private static HiveGenericUDAF init(Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception { HiveFunctionWrapper<GenericUDAFResolver2> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName()); - HiveGenericUDAF udf = new HiveGenericUDAF(wrapper); + HiveGenericUDAF udf = new HiveGenericUDAF(wrapper, HiveShimLoader.getHiveVersion()); udf.setArgumentTypesAndConstants(constantArgs, argTypes); udf.getHiveResultType(constantArgs, argTypes);