This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new a09dcb2 [FLINK-20822][hive] Don't check whether a function is generic
in Hive catalog
a09dcb2 is described below
commit a09dcb2a735fc79cf5eb9fd7c9bda88486fafab8
Author: Rui Li <[email protected]>
AuthorDate: Tue Jan 5 15:41:37 2021 +0800
[FLINK-20822][hive] Don't check whether a function is generic in Hive
catalog
This fixes the ClassNotFound problem when create a generic but not loaded
function in Hive catalog
This closes #14555
---
.../flink/table/catalog/hive/HiveCatalog.java | 60 ++++++++----------
.../hive/HiveCatalogGenericMetadataTest.java | 71 ++++++++++++++++++++++
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 14 +++++
.../catalog/hive/HiveCatalogMetadataTestBase.java | 12 ----
4 files changed, 109 insertions(+), 48 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index c9bee0e..95ad2fd 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -137,12 +137,9 @@ public class HiveCatalog extends AbstractCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalog.class);
- // Prefix used to distinguish Flink functions from Hive functions.
- // It's appended to Flink function's class name
- // because Hive's Function object doesn't have properties or other place
to store the flag for
- // Flink functions.
- private static final String FLINK_FUNCTION_PREFIX = "flink:";
- private static final String FLINK_PYTHON_FUNCTION_PREFIX =
FLINK_FUNCTION_PREFIX + "python:";
+ // Prefix used to distinguish scala/python functions
+ private static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:";
+ private static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:";
private final HiveConf hiveConf;
private final String hiveVersion;
@@ -1254,15 +1251,8 @@ public class HiveCatalog extends AbstractCatalog {
checkNotNull(newFunction, "newFunction cannot be null");
try {
- CatalogFunction existingFunction = getFunction(functionPath);
- boolean existingType = existingFunction.isGeneric();
- boolean newType = newFunction.isGeneric();
- if (existingType != newType) {
- throw new CatalogException(
- String.format(
- "Function types don't match. Existing function
%s generic, and new function %s generic.",
- existingType ? "is" : "isn't", newType ? "is"
: "isn't"));
- }
+ // check if function exists
+ getFunction(functionPath);
Function hiveFunction;
if (newFunction instanceof CatalogFunctionImpl) {
@@ -1334,16 +1324,18 @@ public class HiveCatalog extends AbstractCatalog {
client.getFunction(
functionPath.getDatabaseName(),
functionPath.getObjectName());
- if (function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) {
- if
(function.getClassName().startsWith(FLINK_PYTHON_FUNCTION_PREFIX)) {
- return new CatalogFunctionImpl(
- function.getClassName()
-
.substring(FLINK_PYTHON_FUNCTION_PREFIX.length()),
- FunctionLanguage.PYTHON);
- } else {
- return new CatalogFunctionImpl(
-
function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()));
- }
+ if
(function.getClassName().startsWith(FLINK_PYTHON_FUNCTION_PREFIX)) {
+ return new CatalogFunctionImpl(
+
function.getClassName().substring(FLINK_PYTHON_FUNCTION_PREFIX.length()),
+ FunctionLanguage.PYTHON);
+ } else if
(function.getClassName().startsWith(FLINK_SCALA_FUNCTION_PREFIX)) {
+ return new CatalogFunctionImpl(
+
function.getClassName().substring(FLINK_SCALA_FUNCTION_PREFIX.length()),
+ FunctionLanguage.SCALA);
+ } else if (function.getClassName().startsWith("flink:")) {
+ // to be compatible with old behavior
+ return new CatalogFunctionImpl(
+ function.getClassName().substring("flink:".length()));
} else {
return new CatalogFunctionImpl(function.getClassName());
}
@@ -1375,23 +1367,19 @@ public class HiveCatalog extends AbstractCatalog {
private static Function instantiateHiveFunction(
ObjectPath functionPath, CatalogFunction function) {
-
- boolean isGeneric = function.isGeneric();
-
// Hive Function does not have properties map
- // thus, use a prefix in class name to distinguish Flink and Hive
functions
+ // thus, use a prefix in class name to distinguish Java/Scala and
Python functions
String functionClassName;
- if (function.getFunctionLanguage().equals(FunctionLanguage.JAVA)) {
- functionClassName =
- isGeneric
- ? FLINK_FUNCTION_PREFIX + function.getClassName()
- : function.getClassName();
- } else if
(function.getFunctionLanguage().equals(FunctionLanguage.PYTHON)) {
+ if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
+ functionClassName = function.getClassName();
+ } else if (function.getFunctionLanguage() == FunctionLanguage.SCALA) {
+ functionClassName = FLINK_SCALA_FUNCTION_PREFIX +
function.getClassName();
+ } else if (function.getFunctionLanguage() == FunctionLanguage.PYTHON) {
functionClassName = FLINK_PYTHON_FUNCTION_PREFIX +
function.getClassName();
} else {
throw new UnsupportedOperationException(
"HiveCatalog supports only creating"
- + " JAVA or PYTHON based function for now");
+ + " JAVA/SCALA or PYTHON based function for now");
}
return new Function(
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 013a640..c3562c3 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -21,16 +21,26 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.functions.TestGenericUDF;
+import org.apache.flink.table.functions.TestSimpleUDF;
import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -288,6 +298,56 @@ public class HiveCatalogGenericMetadataTest extends
HiveCatalogMetadataTestBase
}
}
+ @Test
+ public void testFunctionCompatibility() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ // create a function with old prefix 'flink:' and make sure we can
properly retrieve it
+ ((HiveCatalog) catalog)
+ .client.createFunction(
+ new Function(
+ path1.getObjectName().toLowerCase(),
+ path1.getDatabaseName(),
+ "flink:class.name",
+ null,
+ PrincipalType.GROUP,
+ (int) (System.currentTimeMillis() / 1000),
+ FunctionType.JAVA,
+ new ArrayList<>()));
+ CatalogFunction catalogFunction = catalog.getFunction(path1);
+ assertEquals("class.name", catalogFunction.getClassName());
+ assertEquals(FunctionLanguage.JAVA,
catalogFunction.getFunctionLanguage());
+ }
+
+ // ------ functions ------
+
+ @Test
+ public void testFunctionWithNonExistClass() throws Exception {
+ // to make sure hive catalog doesn't check function class
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogFunction catalogFunction =
+ new CatalogFunctionImpl("non.exist.scala.class",
FunctionLanguage.SCALA);
+ catalog.createFunction(path1, catalogFunction, false);
+ assertEquals(catalogFunction.getClassName(),
catalog.getFunction(path1).getClassName());
+ assertEquals(
+ catalogFunction.getFunctionLanguage(),
+ catalog.getFunction(path1).getFunctionLanguage());
+ // alter the function
+ catalogFunction = new CatalogFunctionImpl("non.exist.java.class",
FunctionLanguage.JAVA);
+ catalog.alterFunction(path1, catalogFunction, false);
+ assertEquals(catalogFunction.getClassName(),
catalog.getFunction(path1).getClassName());
+ assertEquals(
+ catalogFunction.getFunctionLanguage(),
+ catalog.getFunction(path1).getFunctionLanguage());
+
+ catalogFunction =
+ new CatalogFunctionImpl("non.exist.python.class",
FunctionLanguage.PYTHON);
+ catalog.alterFunction(path1, catalogFunction, false);
+ assertEquals(catalogFunction.getClassName(),
catalog.getFunction(path1).getClassName());
+ assertEquals(
+ catalogFunction.getFunctionLanguage(),
+ catalog.getFunction(path1).getFunctionLanguage());
+ }
+
// ------ partitions ------
@Test
@@ -385,4 +445,15 @@ public class HiveCatalogGenericMetadataTest extends
HiveCatalogMetadataTestBase
public CatalogPartition createPartition() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected CatalogFunction createFunction() {
+ return new
CatalogFunctionImpl(TestGenericUDF.class.getCanonicalName());
+ }
+
+ @Override
+ protected CatalogFunction createAnotherFunction() {
+ return new CatalogFunctionImpl(
+ TestSimpleUDF.class.getCanonicalName(),
FunctionLanguage.SCALA);
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 2c2fb76..c6ce919 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
@@ -47,6 +49,8 @@ import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.udf.UDFRand;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -290,4 +294,14 @@ public class HiveCatalogHiveMetadataTest extends
HiveCatalogMetadataTestBase {
public CatalogTable createStreamingTable() {
throw new UnsupportedOperationException("Hive table cannot be
streaming.");
}
+
+ @Override
+ protected CatalogFunction createFunction() {
+ return new CatalogFunctionImpl(GenericUDFAbs.class.getName());
+ }
+
+ @Override
+ protected CatalogFunction createAnotherFunction() {
+ return new CatalogFunctionImpl(UDFRand.class.getName());
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
index 300ae24..b44f62b 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
@@ -23,8 +23,6 @@ import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.functions.hive.HiveGenericUDF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDF;
import org.junit.Assume;
import org.junit.Test;
@@ -58,16 +56,6 @@ public abstract class HiveCatalogMetadataTestBase extends
CatalogTestBase {
}
@Override
- protected CatalogFunction createFunction() {
- return new CatalogFunctionImpl(HiveSimpleUDF.class.getCanonicalName());
- }
-
- @Override
- protected CatalogFunction createAnotherFunction() {
- return new
CatalogFunctionImpl(HiveGenericUDF.class.getCanonicalName());
- }
-
- @Override
protected CatalogFunction createPythonFunction() {
return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
}