This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new a09dcb2  [FLINK-20822][hive] Don't check whether a function is generic 
in Hive catalog
a09dcb2 is described below

commit a09dcb2a735fc79cf5eb9fd7c9bda88486fafab8
Author: Rui Li <[email protected]>
AuthorDate: Tue Jan 5 15:41:37 2021 +0800

    [FLINK-20822][hive] Don't check whether a function is generic in Hive 
catalog
    
    This fixes the ClassNotFound problem when create a generic but not loaded 
function in Hive catalog
    
    This closes #14555
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 60 ++++++++----------
 .../hive/HiveCatalogGenericMetadataTest.java       | 71 ++++++++++++++++++++++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 14 +++++
 .../catalog/hive/HiveCatalogMetadataTestBase.java  | 12 ----
 4 files changed, 109 insertions(+), 48 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index c9bee0e..95ad2fd 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -137,12 +137,9 @@ public class HiveCatalog extends AbstractCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
 
-    // Prefix used to distinguish Flink functions from Hive functions.
-    // It's appended to Flink function's class name
-    // because Hive's Function object doesn't have properties or other place 
to store the flag for
-    // Flink functions.
-    private static final String FLINK_FUNCTION_PREFIX = "flink:";
-    private static final String FLINK_PYTHON_FUNCTION_PREFIX = 
FLINK_FUNCTION_PREFIX + "python:";
+    // Prefix used to distinguish scala/python functions
+    private static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:";
+    private static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:";
 
     private final HiveConf hiveConf;
     private final String hiveVersion;
@@ -1254,15 +1251,8 @@ public class HiveCatalog extends AbstractCatalog {
         checkNotNull(newFunction, "newFunction cannot be null");
 
         try {
-            CatalogFunction existingFunction = getFunction(functionPath);
-            boolean existingType = existingFunction.isGeneric();
-            boolean newType = newFunction.isGeneric();
-            if (existingType != newType) {
-                throw new CatalogException(
-                        String.format(
-                                "Function types don't match. Existing function 
%s generic, and new function %s generic.",
-                                existingType ? "is" : "isn't", newType ? "is" 
: "isn't"));
-            }
+            // check if function exists
+            getFunction(functionPath);
 
             Function hiveFunction;
             if (newFunction instanceof CatalogFunctionImpl) {
@@ -1334,16 +1324,18 @@ public class HiveCatalog extends AbstractCatalog {
                     client.getFunction(
                             functionPath.getDatabaseName(), 
functionPath.getObjectName());
 
-            if (function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) {
-                if 
(function.getClassName().startsWith(FLINK_PYTHON_FUNCTION_PREFIX)) {
-                    return new CatalogFunctionImpl(
-                            function.getClassName()
-                                    
.substring(FLINK_PYTHON_FUNCTION_PREFIX.length()),
-                            FunctionLanguage.PYTHON);
-                } else {
-                    return new CatalogFunctionImpl(
-                            
function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()));
-                }
+            if 
(function.getClassName().startsWith(FLINK_PYTHON_FUNCTION_PREFIX)) {
+                return new CatalogFunctionImpl(
+                        
function.getClassName().substring(FLINK_PYTHON_FUNCTION_PREFIX.length()),
+                        FunctionLanguage.PYTHON);
+            } else if 
(function.getClassName().startsWith(FLINK_SCALA_FUNCTION_PREFIX)) {
+                return new CatalogFunctionImpl(
+                        
function.getClassName().substring(FLINK_SCALA_FUNCTION_PREFIX.length()),
+                        FunctionLanguage.SCALA);
+            } else if (function.getClassName().startsWith("flink:")) {
+                // to be compatible with old behavior
+                return new CatalogFunctionImpl(
+                        function.getClassName().substring("flink:".length()));
             } else {
                 return new CatalogFunctionImpl(function.getClassName());
             }
@@ -1375,23 +1367,19 @@ public class HiveCatalog extends AbstractCatalog {
 
     private static Function instantiateHiveFunction(
             ObjectPath functionPath, CatalogFunction function) {
-
-        boolean isGeneric = function.isGeneric();
-
         // Hive Function does not have properties map
-        // thus, use a prefix in class name to distinguish Flink and Hive 
functions
+        // thus, use a prefix in class name to distinguish Java/Scala and 
Python functions
         String functionClassName;
-        if (function.getFunctionLanguage().equals(FunctionLanguage.JAVA)) {
-            functionClassName =
-                    isGeneric
-                            ? FLINK_FUNCTION_PREFIX + function.getClassName()
-                            : function.getClassName();
-        } else if 
(function.getFunctionLanguage().equals(FunctionLanguage.PYTHON)) {
+        if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
+            functionClassName = function.getClassName();
+        } else if (function.getFunctionLanguage() == FunctionLanguage.SCALA) {
+            functionClassName = FLINK_SCALA_FUNCTION_PREFIX + 
function.getClassName();
+        } else if (function.getFunctionLanguage() == FunctionLanguage.PYTHON) {
             functionClassName = FLINK_PYTHON_FUNCTION_PREFIX + 
function.getClassName();
         } else {
             throw new UnsupportedOperationException(
                     "HiveCatalog supports only creating"
-                            + " JAVA or PYTHON based function for now");
+                            + " JAVA/SCALA or PYTHON based function for now");
         }
 
         return new Function(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 013a640..c3562c3 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -21,16 +21,26 @@ package org.apache.flink.table.catalog.hive;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.functions.TestGenericUDF;
+import org.apache.flink.table.functions.TestSimpleUDF;
 import org.apache.flink.table.types.DataType;
 
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -288,6 +298,56 @@ public class HiveCatalogGenericMetadataTest extends 
HiveCatalogMetadataTestBase
         }
     }
 
+    @Test
+    public void testFunctionCompatibility() throws Exception {
+        catalog.createDatabase(db1, createDb(), false);
+        // create a function with old prefix 'flink:' and make sure we can 
properly retrieve it
+        ((HiveCatalog) catalog)
+                .client.createFunction(
+                        new Function(
+                                path1.getObjectName().toLowerCase(),
+                                path1.getDatabaseName(),
+                                "flink:class.name",
+                                null,
+                                PrincipalType.GROUP,
+                                (int) (System.currentTimeMillis() / 1000),
+                                FunctionType.JAVA,
+                                new ArrayList<>()));
+        CatalogFunction catalogFunction = catalog.getFunction(path1);
+        assertEquals("class.name", catalogFunction.getClassName());
+        assertEquals(FunctionLanguage.JAVA, 
catalogFunction.getFunctionLanguage());
+    }
+
+    // ------ functions ------
+
+    @Test
+    public void testFunctionWithNonExistClass() throws Exception {
+        // to make sure hive catalog doesn't check function class
+        catalog.createDatabase(db1, createDb(), false);
+        CatalogFunction catalogFunction =
+                new CatalogFunctionImpl("non.exist.scala.class", 
FunctionLanguage.SCALA);
+        catalog.createFunction(path1, catalogFunction, false);
+        assertEquals(catalogFunction.getClassName(), 
catalog.getFunction(path1).getClassName());
+        assertEquals(
+                catalogFunction.getFunctionLanguage(),
+                catalog.getFunction(path1).getFunctionLanguage());
+        // alter the function
+        catalogFunction = new CatalogFunctionImpl("non.exist.java.class", 
FunctionLanguage.JAVA);
+        catalog.alterFunction(path1, catalogFunction, false);
+        assertEquals(catalogFunction.getClassName(), 
catalog.getFunction(path1).getClassName());
+        assertEquals(
+                catalogFunction.getFunctionLanguage(),
+                catalog.getFunction(path1).getFunctionLanguage());
+
+        catalogFunction =
+                new CatalogFunctionImpl("non.exist.python.class", 
FunctionLanguage.PYTHON);
+        catalog.alterFunction(path1, catalogFunction, false);
+        assertEquals(catalogFunction.getClassName(), 
catalog.getFunction(path1).getClassName());
+        assertEquals(
+                catalogFunction.getFunctionLanguage(),
+                catalog.getFunction(path1).getFunctionLanguage());
+    }
+
     // ------ partitions ------
 
     @Test
@@ -385,4 +445,15 @@ public class HiveCatalogGenericMetadataTest extends 
HiveCatalogMetadataTestBase
     public CatalogPartition createPartition() {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    protected CatalogFunction createFunction() {
+        return new 
CatalogFunctionImpl(TestGenericUDF.class.getCanonicalName());
+    }
+
+    @Override
+    protected CatalogFunction createAnotherFunction() {
+        return new CatalogFunctionImpl(
+                TestSimpleUDF.class.getCanonicalName(), 
FunctionLanguage.SCALA);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 2c2fb76..c6ce919 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -47,6 +49,8 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.udf.UDFRand;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -290,4 +294,14 @@ public class HiveCatalogHiveMetadataTest extends 
HiveCatalogMetadataTestBase {
     public CatalogTable createStreamingTable() {
         throw new UnsupportedOperationException("Hive table cannot be 
streaming.");
     }
+
+    @Override
+    protected CatalogFunction createFunction() {
+        return new CatalogFunctionImpl(GenericUDFAbs.class.getName());
+    }
+
+    @Override
+    protected CatalogFunction createAnotherFunction() {
+        return new CatalogFunctionImpl(UDFRand.class.getName());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
index 300ae24..b44f62b 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogMetadataTestBase.java
@@ -23,8 +23,6 @@ import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.functions.hive.HiveGenericUDF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDF;
 
 import org.junit.Assume;
 import org.junit.Test;
@@ -58,16 +56,6 @@ public abstract class HiveCatalogMetadataTestBase extends 
CatalogTestBase {
     }
 
     @Override
-    protected CatalogFunction createFunction() {
-        return new CatalogFunctionImpl(HiveSimpleUDF.class.getCanonicalName());
-    }
-
-    @Override
-    protected CatalogFunction createAnotherFunction() {
-        return new 
CatalogFunctionImpl(HiveGenericUDF.class.getCanonicalName());
-    }
-
-    @Override
     protected CatalogFunction createPythonFunction() {
         return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
     }

Reply via email to