This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d561a6f  [FLINK-13110][hive] add shim of 
SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4
d561a6f is described below

commit d561a6f8ef7c494c575fbe179f41614a7d73ca43
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 5 17:30:17 2019 +0800

    [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 
1.2.1 and 2.3.4
    
    This PR adds shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 
2.3.4.
    
    This closes #9001.
---
 .../apache/flink/table/catalog/hive/client/HiveShim.java  |  8 ++++++++
 .../flink/table/catalog/hive/client/HiveShimV1.java       | 14 ++++++++++++++
 .../flink/table/catalog/hive/client/HiveShimV2.java       | 14 ++++++++++++++
 .../flink/table/functions/hive/HiveGenericUDAF.java       | 15 ++++++++++-----
 .../table/functions/hive/conversion/HiveInspectors.java   |  4 ++++
 .../flink/table/functions/hive/HiveGenericUDAFTest.java   |  3 ++-
 6 files changed, 52 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 320f078..812ece1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
@@ -97,4 +99,10 @@ public interface HiveShim {
         */
        void alterTable(IMetaStoreClient client, String databaseName, String 
tableName, Table table)
                        throws InvalidOperationException, MetaException, 
TException;
+
+       /**
+        * Creates SimpleGenericUDAFParameterInfo.
+        */
+       SimpleGenericUDAFParameterInfo 
createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing,
+                       boolean distinct, boolean allColumns);
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
index 830d6e6..8733100 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -105,4 +108,15 @@ public class HiveShimV1 implements HiveShim {
                table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, 
"true");
                client.alter_table(databaseName, tableName, table);
        }
+
+       @Override
+       public SimpleGenericUDAFParameterInfo 
createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean 
distinct, boolean allColumns) {
+               try {
+                       Constructor constructor = 
SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+                                       boolean.class, boolean.class);
+                       return (SimpleGenericUDAFParameterInfo) 
constructor.newInstance(params, distinct, allColumns);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException | InvocationTargetException e) {
+                       throw new CatalogException("Failed to create 
SimpleGenericUDAFParameterInfo", e);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
index 7df0aaf..2510497 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -95,4 +98,15 @@ public class HiveShimV2 implements HiveShim {
                // For Hive-2.3.4, we don't need to tell HMS not to update 
stats.
                client.alter_table(databaseName, tableName, table);
        }
+
+       @Override
+       public SimpleGenericUDAFParameterInfo 
createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean 
distinct, boolean allColumns) {
+               try {
+                       Constructor constructor = 
SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+                                       boolean.class, boolean.class, 
boolean.class);
+                       return (SimpleGenericUDAFParameterInfo) 
constructor.newInstance(params, isWindowing, distinct, allColumns);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException | InvocationTargetException e) {
+                       throw new CatalogException("Failed to create 
SimpleGenericUDAFParameterInfo", e);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 163fc06..5782455 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionContext;
@@ -35,7 +37,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
-import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.util.Arrays;
@@ -62,13 +63,16 @@ public class HiveGenericUDAF
        private transient boolean allIdentityConverter;
        private transient boolean initialized;
 
-       public HiveGenericUDAF(HiveFunctionWrapper funcWrapper) {
-               this(funcWrapper, false);
+       private final String hiveVersion;
+
+       public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, String 
hiveVersion) {
+               this(funcWrapper, false, hiveVersion);
        }
 
-       public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean 
isUDAFBridgeRequired) {
+       public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean 
isUDAFBridgeRequired, String hiveVersion) {
                this.hiveFunctionWrapper = funcWrapper;
                this.isUDAFBridgeRequired = isUDAFBridgeRequired;
+               this.hiveVersion = hiveVersion;
        }
 
        @Override
@@ -112,8 +116,9 @@ public class HiveGenericUDAF
                        resolver = (GenericUDAFResolver2) 
hiveFunctionWrapper.createFunction();
                }
 
+               HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
                return resolver.getEvaluator(
-                       new SimpleGenericUDAFParameterInfo(
+                       hiveShim.createUDAFParameterInfo(
                                inputInspectors,
                                // The flag to indicate if the UDAF invocation 
was from the windowing function call or not.
                                // TODO: investigate whether this has impact on 
Flink streaming job with windows
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 0f9dd67..85ca033 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -338,6 +338,10 @@ public class HiveInspectors {
                        List<? extends StructField> fields = 
structInspector.getAllStructFieldRefs();
 
                        Row row = new Row(fields.size());
+                       // StandardStructObjectInspector.getStructFieldData in 
Hive-1.2.1 only accepts array or list as data
+                       if (!data.getClass().isArray() && !(data instanceof 
List)) {
+                               data = new Object[]{data};
+                       }
                        for (int i = 0; i < row.getArity(); i++) {
                                row.setField(
                                        i,
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
index df09107..2945523 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
@@ -129,7 +130,7 @@ public class HiveGenericUDAFTest {
        private static HiveGenericUDAF init(Class hiveUdfClass, Object[] 
constantArgs, DataType[] argTypes) throws Exception {
                HiveFunctionWrapper<GenericUDAFResolver2> wrapper = new 
HiveFunctionWrapper(hiveUdfClass.getName());
 
-               HiveGenericUDAF udf = new HiveGenericUDAF(wrapper);
+               HiveGenericUDAF udf = new HiveGenericUDAF(wrapper, 
HiveShimLoader.getHiveVersion());
 
                udf.setArgumentTypesAndConstants(constantArgs, argTypes);
                udf.getHiveResultType(constantArgs, argTypes);

Reply via email to