This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a0b9c7  [FLINK-12660][hive] Integrate Flink with Hive UDAF
8a0b9c7 is described below

commit 8a0b9c795752696f595fc75e10128fc47acfb066
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 25 13:28:39 2019 -0700

    [FLINK-12660][hive] Integrate Flink with Hive UDAF
    
    This PR adds support for Hive UDAF in Flink.
    
    For Hive UDAF, there are mainly four interfaces: UDAF, GenericUDAFResolver, 
GenericUDAFResolver2, AbstractGenericUDAFResolver
    
    UDAF is kind of on its own, deprecated in Hive a long time ago. 
GenericUDAFResolver and AbstractGenericUDAFResolver are deprecated for 
GenericUDAFResolver2.
    
    This closes #8881.
---
 .../table/functions/hive/HiveGenericUDAF.java      | 215 +++++++++++++++++++++
 .../table/functions/hive/HiveGenericUDAFTest.java  | 121 ++++++++++++
 2 files changed, 336 insertions(+)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
new file mode 100644
index 0000000..163fc06
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.functions.hive.conversion.IdentityConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.util.Arrays;
+
+/**
+ * An {@link AggregateFunction} implementation that calls Hive's {@link UDAF} 
or {@link GenericUDAFEvaluator}.
+ */
+@Internal
+public class HiveGenericUDAF
+       extends AggregateFunction<Object, 
GenericUDAFEvaluator.AggregationBuffer> implements HiveFunction {
+
+       private final HiveFunctionWrapper hiveFunctionWrapper;
+       // Flag that indicates whether a bridge between GenericUDAF and UDAF is 
required.
+       // Old UDAF can be used with the GenericUDAF infrastructure through 
bridging.
+       private final boolean isUDAFBridgeRequired;
+
+       private Object[] constantArguments;
+       private DataType[] argTypes;
+
+       private transient GenericUDAFEvaluator partialEvaluator;
+       private transient GenericUDAFEvaluator finalEvaluator;
+       private transient ObjectInspector finalResult;
+       private transient HiveObjectConversion[] conversions;
+       private transient boolean allIdentityConverter;
+       private transient boolean initialized;
+
+       public HiveGenericUDAF(HiveFunctionWrapper funcWrapper) {
+               this(funcWrapper, false);
+       }
+
+       public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean 
isUDAFBridgeRequired) {
+               this.hiveFunctionWrapper = funcWrapper;
+               this.isUDAFBridgeRequired = isUDAFBridgeRequired;
+       }
+
+       @Override
+       public void open(FunctionContext context) throws Exception {
+               super.open(context);
+               init();
+       }
+
+       private void init() throws HiveException {
+               ObjectInspector[] inputInspectors = 
HiveInspectors.toInspectors(constantArguments, argTypes);
+
+               // Flink UDAF only supports Hive UDAF's PARTIAL_1 and FINAL mode
+
+               // PARTIAL1: from original data to partial aggregation data:
+               //              iterate() and terminatePartial() will be called.
+               this.partialEvaluator = createEvaluator(inputInspectors);
+               ObjectInspector partialResult = 
partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
+
+               // FINAL: from partial aggregation to full aggregation:
+               //              merge() and terminate() will be called.
+               this.finalEvaluator = createEvaluator(inputInspectors);
+               this.finalResult = finalEvaluator.init(
+                       GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ 
partialResult });
+
+               conversions = new HiveObjectConversion[inputInspectors.length];
+               for (int i = 0; i < inputInspectors.length; i++) {
+                       conversions[i] = 
HiveInspectors.getConversion(inputInspectors[i], argTypes[i].getLogicalType());
+               }
+               allIdentityConverter = Arrays.stream(conversions)
+                       .allMatch(conv -> conv instanceof IdentityConversion);
+
+               initialized = true;
+       }
+
+       private GenericUDAFEvaluator createEvaluator(ObjectInspector[] 
inputInspectors) throws SemanticException {
+               GenericUDAFResolver2 resolver;
+
+               if (isUDAFBridgeRequired) {
+                       resolver = new GenericUDAFBridge((UDAF) 
hiveFunctionWrapper.createFunction());
+               } else {
+                       resolver = (GenericUDAFResolver2) 
hiveFunctionWrapper.createFunction();
+               }
+
+               return resolver.getEvaluator(
+                       new SimpleGenericUDAFParameterInfo(
+                               inputInspectors,
+                               // The flag to indicate if the UDAF invocation 
was from the windowing function call or not.
+                               // TODO: investigate whether this has impact on 
Flink streaming job with windows
+                               Boolean.FALSE,
+                               // Returns true if the UDAF invocation was 
qualified with DISTINCT keyword.
+                               // Note that this is provided for informational 
purposes only and the function implementation
+                               // is not expected to ensure the distinct 
property for the parameter values.
+                               // That is handled by the framework.
+                               Boolean.FALSE,
+                               // Returns true if the UDAF invocation was done 
via the wildcard syntax FUNCTION(*).
+                               // Note that this is provided for informational 
purposes only and the function implementation
+                               // is not expected to ensure the wildcard 
handling of the target relation.
+                               // That is handled by the framework.
+                               Boolean.FALSE));
+       }
+
+       /**
+        * This is invoked without calling open() in Blink, so we need to call 
init() for getNewAggregationBuffer().
+        * TODO: re-evaluate how this will fit into Flink's new type inference 
and udf systemß
+        */
+       @Override
+       public GenericUDAFEvaluator.AggregationBuffer createAccumulator() {
+               try {
+                       if (!initialized) {
+                               init();
+                       }
+                       return partialEvaluator.getNewAggregationBuffer();
+               } catch (Exception e) {
+                       throw new FlinkHiveUDFException(
+                               String.format("Failed to create accumulator for 
%s", hiveFunctionWrapper.getClassName()), e);
+               }
+       }
+
+       public void accumulate(GenericUDAFEvaluator.AggregationBuffer acc, 
Object... inputs) throws HiveException {
+               if (!allIdentityConverter) {
+                       for (int i = 0; i < inputs.length; i++) {
+                               inputs[i] = 
conversions[i].toHiveObject(inputs[i]);
+                       }
+               }
+
+               partialEvaluator.iterate(acc, inputs);
+       }
+
+       public void merge(
+                       GenericUDAFEvaluator.AggregationBuffer accumulator,
+                       Iterable<GenericUDAFEvaluator.AggregationBuffer> its) 
throws HiveException {
+
+               for (GenericUDAFEvaluator.AggregationBuffer buffer : its) {
+                       finalEvaluator.merge(
+                               accumulator, 
partialEvaluator.terminatePartial(buffer));
+               }
+       }
+
+       @Override
+       public Object getValue(GenericUDAFEvaluator.AggregationBuffer 
accumulator) {
+               try {
+                       return HiveInspectors.toFlinkObject(finalResult, 
finalEvaluator.terminate(accumulator));
+               } catch (HiveException e) {
+                       throw new FlinkHiveUDFException(
+                               String.format("Failed to get final result on 
%s", hiveFunctionWrapper.getClassName()), e);
+               }
+       }
+
+       @Override
+       public void setArgumentTypesAndConstants(Object[] constantArguments, 
DataType[] argTypes) {
+               this.constantArguments = constantArguments;
+               this.argTypes = argTypes;
+       }
+
+       @Override
+       public DataType getHiveResultType(Object[] constantArguments, 
DataType[] argTypes) {
+               try {
+                       ObjectInspector[] inputs = 
HiveInspectors.toInspectors(constantArguments, argTypes);
+                       GenericUDAFEvaluator evaluator = 
createEvaluator(inputs);
+
+                       // The ObjectInspector for the parameters:
+                       // In PARTIAL1 mode, the parameters are original data;
+                       // In FINAL mode, the parameters are just partial 
aggregations
+                       // (in that case, the array will always have a single 
element).
+
+                       ObjectInspector partialObjectInspector = 
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputs);
+
+                       ObjectInspector finalObjectInspector = evaluator.init(
+                               GenericUDAFEvaluator.Mode.FINAL,
+                               new ObjectInspector[]{ partialObjectInspector 
});
+
+                       return HiveTypeUtil.toFlinkType(finalObjectInspector);
+               } catch (Exception e) {
+                       throw new FlinkHiveUDFException(
+                               String.format("Failed to get Hive result type 
from %s", hiveFunctionWrapper.getClassName()), e);
+               }
+       }
+
+       @Override
+       public TypeInformation getResultType() {
+               return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+                       getHiveResultType(this.constantArguments, 
this.argTypes));
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
new file mode 100644
index 0000000..94dd0fd
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link HiveGenericUDAF}.
+ */
+public class HiveGenericUDAFTest {
+       @Test
+       public void testUDAFMin() throws Exception {
+               Object[] constantArgs = new Object[] {
+                       null
+               };
+
+               DataType[] argTypes = new DataType[] {
+                       DataTypes.BIGINT()
+               };
+
+               HiveGenericUDAF udf = init(GenericUDAFMin.class, constantArgs, 
argTypes);
+
+               GenericUDAFEvaluator.AggregationBuffer acc = 
udf.createAccumulator();
+
+               udf.accumulate(acc, 2L);
+               udf.accumulate(acc, 3L);
+               udf.accumulate(acc, 1L);
+
+               udf.merge(acc, Arrays.asList());
+
+               assertEquals(1L, udf.getValue(acc));
+       }
+
+       @Test
+       public void testUDAFSum() throws Exception {
+               Object[] constantArgs = new Object[] {
+                       null
+               };
+
+               DataType[] argTypes = new DataType[] {
+                       DataTypes.DOUBLE()
+               };
+
+               HiveGenericUDAF udf = init(GenericUDAFSum.class, constantArgs, 
argTypes);
+
+               GenericUDAFEvaluator.AggregationBuffer acc = 
udf.createAccumulator();
+
+               udf.accumulate(acc, 0.5d);
+               udf.accumulate(acc, 0.3d);
+               udf.accumulate(acc, 5.3d);
+
+               udf.merge(acc, Arrays.asList());
+
+               assertEquals(6.1d, udf.getValue(acc));
+       }
+
+       @Test
+       public void testUDAFCount() throws Exception {
+               Object[] constantArgs = new Object[] {
+                       null
+               };
+
+               DataType[] argTypes = new DataType[] {
+                       DataTypes.DOUBLE()
+               };
+
+               HiveGenericUDAF udf = init(GenericUDAFCount.class, 
constantArgs, argTypes);
+
+               GenericUDAFEvaluator.AggregationBuffer acc = 
udf.createAccumulator();
+
+               udf.accumulate(acc, 0.5d);
+               udf.accumulate(acc, 0.3d);
+               udf.accumulate(acc, 5.3d);
+
+               udf.merge(acc, Arrays.asList());
+
+               assertEquals(3L, udf.getValue(acc));
+       }
+
+       private static HiveGenericUDAF init(Class hiveUdfClass, Object[] 
constantArgs, DataType[] argTypes) throws Exception {
+               HiveFunctionWrapper<GenericUDAFResolver2> wrapper = new 
HiveFunctionWrapper(hiveUdfClass.getName());
+
+               HiveGenericUDAF udf = new HiveGenericUDAF(wrapper);
+
+               udf.setArgumentTypesAndConstants(constantArgs, argTypes);
+               udf.getHiveResultType(constantArgs, argTypes);
+
+               udf.open(null);
+
+               return udf;
+       }
+
+}

Reply via email to