This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8a0b9c7 [FLINK-12660][hive] Integrate Flink with Hive UDAF
8a0b9c7 is described below
commit 8a0b9c795752696f595fc75e10128fc47acfb066
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 25 13:28:39 2019 -0700
[FLINK-12660][hive] Integrate Flink with Hive UDAF
This PR adds support for Hive UDAF in Flink.
For Hive UDAF, there are mainly four interfaces: UDAF, GenericUDAFResolver,
GenericUDAFResolver2, AbstractGenericUDAFResolver
UDAF is kind of on its own, deprecated in Hive a long time ago.
GenericUDAFResolver and AbstractGenericUDAFResolver are deprecated for
GenericUDAFResolver2.
This closes #8881.
---
.../table/functions/hive/HiveGenericUDAF.java | 215 +++++++++++++++++++++
.../table/functions/hive/HiveGenericUDAFTest.java | 121 ++++++++++++
2 files changed, 336 insertions(+)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
new file mode 100644
index 0000000..163fc06
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.functions.hive.conversion.IdentityConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.util.Arrays;
+
+/**
+ * An {@link AggregateFunction} implementation that calls Hive's {@link UDAF}
or {@link GenericUDAFEvaluator}.
+ */
+@Internal
+public class HiveGenericUDAF
+ extends AggregateFunction<Object,
GenericUDAFEvaluator.AggregationBuffer> implements HiveFunction {
+
+ private final HiveFunctionWrapper hiveFunctionWrapper;
+ // Flag that indicates whether a bridge between GenericUDAF and UDAF is
required.
+ // Old UDAF can be used with the GenericUDAF infrastructure through
bridging.
+ private final boolean isUDAFBridgeRequired;
+
+ private Object[] constantArguments;
+ private DataType[] argTypes;
+
+ private transient GenericUDAFEvaluator partialEvaluator;
+ private transient GenericUDAFEvaluator finalEvaluator;
+ private transient ObjectInspector finalResult;
+ private transient HiveObjectConversion[] conversions;
+ private transient boolean allIdentityConverter;
+ private transient boolean initialized;
+
+ public HiveGenericUDAF(HiveFunctionWrapper funcWrapper) {
+ this(funcWrapper, false);
+ }
+
+ public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean
isUDAFBridgeRequired) {
+ this.hiveFunctionWrapper = funcWrapper;
+ this.isUDAFBridgeRequired = isUDAFBridgeRequired;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ init();
+ }
+
+ private void init() throws HiveException {
+ ObjectInspector[] inputInspectors =
HiveInspectors.toInspectors(constantArguments, argTypes);
+
+ // Flink UDAF only supports Hive UDAF's PARTIAL_1 and FINAL mode
+
+ // PARTIAL1: from original data to partial aggregation data:
+ // iterate() and terminatePartial() will be called.
+ this.partialEvaluator = createEvaluator(inputInspectors);
+ ObjectInspector partialResult =
partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
+
+ // FINAL: from partial aggregation to full aggregation:
+ // merge() and terminate() will be called.
+ this.finalEvaluator = createEvaluator(inputInspectors);
+ this.finalResult = finalEvaluator.init(
+ GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{
partialResult });
+
+ conversions = new HiveObjectConversion[inputInspectors.length];
+ for (int i = 0; i < inputInspectors.length; i++) {
+ conversions[i] =
HiveInspectors.getConversion(inputInspectors[i], argTypes[i].getLogicalType());
+ }
+ allIdentityConverter = Arrays.stream(conversions)
+ .allMatch(conv -> conv instanceof IdentityConversion);
+
+ initialized = true;
+ }
+
+ private GenericUDAFEvaluator createEvaluator(ObjectInspector[]
inputInspectors) throws SemanticException {
+ GenericUDAFResolver2 resolver;
+
+ if (isUDAFBridgeRequired) {
+ resolver = new GenericUDAFBridge((UDAF)
hiveFunctionWrapper.createFunction());
+ } else {
+ resolver = (GenericUDAFResolver2)
hiveFunctionWrapper.createFunction();
+ }
+
+ return resolver.getEvaluator(
+ new SimpleGenericUDAFParameterInfo(
+ inputInspectors,
+ // The flag to indicate if the UDAF invocation
was from the windowing function call or not.
+ // TODO: investigate whether this has impact on
Flink streaming job with windows
+ Boolean.FALSE,
+ // Returns true if the UDAF invocation was
qualified with DISTINCT keyword.
+ // Note that this is provided for informational
purposes only and the function implementation
+ // is not expected to ensure the distinct
property for the parameter values.
+ // That is handled by the framework.
+ Boolean.FALSE,
+ // Returns true if the UDAF invocation was done
via the wildcard syntax FUNCTION(*).
+ // Note that this is provided for informational
purposes only and the function implementation
+ // is not expected to ensure the wildcard
handling of the target relation.
+ // That is handled by the framework.
+ Boolean.FALSE));
+ }
+
+ /**
+ * This is invoked without calling open() in Blink, so we need to call
init() for getNewAggregationBuffer().
+ * TODO: re-evaluate how this will fit into Flink's new type inference
and udf systemß
+ */
+ @Override
+ public GenericUDAFEvaluator.AggregationBuffer createAccumulator() {
+ try {
+ if (!initialized) {
+ init();
+ }
+ return partialEvaluator.getNewAggregationBuffer();
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException(
+ String.format("Failed to create accumulator for
%s", hiveFunctionWrapper.getClassName()), e);
+ }
+ }
+
+ public void accumulate(GenericUDAFEvaluator.AggregationBuffer acc,
Object... inputs) throws HiveException {
+ if (!allIdentityConverter) {
+ for (int i = 0; i < inputs.length; i++) {
+ inputs[i] =
conversions[i].toHiveObject(inputs[i]);
+ }
+ }
+
+ partialEvaluator.iterate(acc, inputs);
+ }
+
+ public void merge(
+ GenericUDAFEvaluator.AggregationBuffer accumulator,
+ Iterable<GenericUDAFEvaluator.AggregationBuffer> its)
throws HiveException {
+
+ for (GenericUDAFEvaluator.AggregationBuffer buffer : its) {
+ finalEvaluator.merge(
+ accumulator,
partialEvaluator.terminatePartial(buffer));
+ }
+ }
+
+ @Override
+ public Object getValue(GenericUDAFEvaluator.AggregationBuffer
accumulator) {
+ try {
+ return HiveInspectors.toFlinkObject(finalResult,
finalEvaluator.terminate(accumulator));
+ } catch (HiveException e) {
+ throw new FlinkHiveUDFException(
+ String.format("Failed to get final result on
%s", hiveFunctionWrapper.getClassName()), e);
+ }
+ }
+
+ @Override
+ public void setArgumentTypesAndConstants(Object[] constantArguments,
DataType[] argTypes) {
+ this.constantArguments = constantArguments;
+ this.argTypes = argTypes;
+ }
+
+ @Override
+ public DataType getHiveResultType(Object[] constantArguments,
DataType[] argTypes) {
+ try {
+ ObjectInspector[] inputs =
HiveInspectors.toInspectors(constantArguments, argTypes);
+ GenericUDAFEvaluator evaluator =
createEvaluator(inputs);
+
+ // The ObjectInspector for the parameters:
+ // In PARTIAL1 mode, the parameters are original data;
+ // In FINAL mode, the parameters are just partial
aggregations
+ // (in that case, the array will always have a single
element).
+
+ ObjectInspector partialObjectInspector =
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputs);
+
+ ObjectInspector finalObjectInspector = evaluator.init(
+ GenericUDAFEvaluator.Mode.FINAL,
+ new ObjectInspector[]{ partialObjectInspector
});
+
+ return HiveTypeUtil.toFlinkType(finalObjectInspector);
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException(
+ String.format("Failed to get Hive result type
from %s", hiveFunctionWrapper.getClassName()), e);
+ }
+ }
+
+ @Override
+ public TypeInformation getResultType() {
+ return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+ getHiveResultType(this.constantArguments,
this.argTypes));
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
new file mode 100644
index 0000000..94dd0fd
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link HiveGenericUDAF}.
+ */
+public class HiveGenericUDAFTest {
+ @Test
+ public void testUDAFMin() throws Exception {
+ Object[] constantArgs = new Object[] {
+ null
+ };
+
+ DataType[] argTypes = new DataType[] {
+ DataTypes.BIGINT()
+ };
+
+ HiveGenericUDAF udf = init(GenericUDAFMin.class, constantArgs,
argTypes);
+
+ GenericUDAFEvaluator.AggregationBuffer acc =
udf.createAccumulator();
+
+ udf.accumulate(acc, 2L);
+ udf.accumulate(acc, 3L);
+ udf.accumulate(acc, 1L);
+
+ udf.merge(acc, Arrays.asList());
+
+ assertEquals(1L, udf.getValue(acc));
+ }
+
+ @Test
+ public void testUDAFSum() throws Exception {
+ Object[] constantArgs = new Object[] {
+ null
+ };
+
+ DataType[] argTypes = new DataType[] {
+ DataTypes.DOUBLE()
+ };
+
+ HiveGenericUDAF udf = init(GenericUDAFSum.class, constantArgs,
argTypes);
+
+ GenericUDAFEvaluator.AggregationBuffer acc =
udf.createAccumulator();
+
+ udf.accumulate(acc, 0.5d);
+ udf.accumulate(acc, 0.3d);
+ udf.accumulate(acc, 5.3d);
+
+ udf.merge(acc, Arrays.asList());
+
+ assertEquals(6.1d, udf.getValue(acc));
+ }
+
+ @Test
+ public void testUDAFCount() throws Exception {
+ Object[] constantArgs = new Object[] {
+ null
+ };
+
+ DataType[] argTypes = new DataType[] {
+ DataTypes.DOUBLE()
+ };
+
+ HiveGenericUDAF udf = init(GenericUDAFCount.class,
constantArgs, argTypes);
+
+ GenericUDAFEvaluator.AggregationBuffer acc =
udf.createAccumulator();
+
+ udf.accumulate(acc, 0.5d);
+ udf.accumulate(acc, 0.3d);
+ udf.accumulate(acc, 5.3d);
+
+ udf.merge(acc, Arrays.asList());
+
+ assertEquals(3L, udf.getValue(acc));
+ }
+
+ private static HiveGenericUDAF init(Class hiveUdfClass, Object[]
constantArgs, DataType[] argTypes) throws Exception {
+ HiveFunctionWrapper<GenericUDAFResolver2> wrapper = new
HiveFunctionWrapper(hiveUdfClass.getName());
+
+ HiveGenericUDAF udf = new HiveGenericUDAF(wrapper);
+
+ udf.setArgumentTypesAndConstants(constantArgs, argTypes);
+ udf.getHiveResultType(constantArgs, argTypes);
+
+ udf.open(null);
+
+ return udf;
+ }
+
+}