This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new b29197e  [ZEPPELIN-5272] Row return type FunctionHint doesn't work in 
flink interpreter
b29197e is described below

commit b29197e9d0e4d55da9132c6682f7a2647d96ca5d
Author: Jeff Zhang <[email protected]>
AuthorDate: Tue Mar 2 22:25:34 2021 +0800

    [ZEPPELIN-5272] Row return type FunctionHint doesn't work in flink 
interpreter
    
    ### What is this PR for?
    
    The root cause is in flink side. And flink 1.11 fix this issue via 
introducing a new api `createTemporarySystemFunction`. In this PR, I will this 
method for `flink1.11-shim` and `flink1.12-shim`. Unit test is added as well.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5272
    
    ### How should this be tested?
    * UT is added
    
    ### Screenshots (if appropriate)
    
![image](https://user-images.githubusercontent.com/164491/109663552-34c4ce00-7ba7-11eb-9f17-56091d669bc6.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? NO
    
    Author: Jeff Zhang <[email protected]>
    
    Closes #4065 from zjffdu/ZEPPELIN-5272 and squashes the following commits:
    
    919055246 [Jeff Zhang] [ZEPPELIN-5272] Row return type FunctionHint doesn't 
work in flink interpreter
---
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  2 ++
 .../org/apache/zeppelin/flink/Flink110Shims.java   |  7 +++++
 .../org/apache/zeppelin/flink/Flink111Shims.java   |  6 ++++
 .../org/apache/zeppelin/flink/Flink112Shims.java   |  6 ++++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  2 +-
 .../flink/FlinkBatchSqlInterpreterTest.java        | 34 ++++++++++++++++++++++
 6 files changed, 56 insertions(+), 1 deletion(-)

diff --git 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 717ef3d..bbe718c 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -121,6 +121,8 @@ public abstract class FlinkShims {
 
   public abstract Object toDataSet(Object btenv, Object table);
 
+  public abstract void registerScalarFunction(Object btenv, String name, 
Object scalarFunction);
+
   public abstract void registerTableFunction(Object btenv, String name, Object 
tableFunction);
 
   public abstract void registerAggregateFunction(Object btenv, String name, 
Object aggregateFunction);
diff --git 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index ee551ea..4eaa905 100644
--- 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -41,8 +41,10 @@ import 
org.apache.flink.table.api.scala.BatchTableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkException;
@@ -195,6 +197,11 @@ public class Flink110Shims extends FlinkShims {
   }
 
   @Override
+  public void registerScalarFunction(Object btenv, String name, Object 
scalarFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(ScalarFunction) scalarFunction);
+  }
+
+  @Override
   public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
     ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(TableFunction) tableFunction);
   }
diff --git 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index 9c26346..5cfd280 100644
--- 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -247,6 +248,11 @@ public class Flink111Shims extends FlinkShims {
   }
 
   @Override
+  public void registerScalarFunction(Object btenv, String name, Object 
scalarFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, 
(ScalarFunction) scalarFunction);
+  }
+
+  @Override
   public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
     ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableFunction) tableFunction);
   }
diff --git 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
 
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 95f457d..f5dcef9 100644
--- 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ 
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -248,6 +249,11 @@ public class Flink112Shims extends FlinkShims {
   }
 
   @Override
+  public void registerScalarFunction(Object btenv, String name, Object 
scalarFunction) {
+    ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, 
(ScalarFunction) scalarFunction);
+  }
+
+  @Override
   public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
     ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableFunction) tableFunction);
   }
diff --git 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 572426b..4084f1f 100644
--- 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -487,7 +487,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
             val udf = c.newInstance()
             if (udf.isInstanceOf[ScalarFunction]) {
               val scalarUDF = udf.asInstanceOf[ScalarFunction]
-              btenv.registerFunction(c.getSimpleName, scalarUDF)
+              flinkShims.registerScalarFunction(btenv, c.getSimpleName, 
scalarUDF)
             } else if (udf.isInstanceOf[TableFunction[_]]) {
               val tableUDF = udf.asInstanceOf[TableFunction[_]]
               flinkShims.registerTableFunction(btenv, c.getSimpleName, 
tableUDF)
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index b3c3ae4..6077ff5 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -401,4 +401,38 @@ public class FlinkBatchSqlInterpreterTest extends 
SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
   }
+
+  @Test
+  public void testFunctionHintRowType() throws InterpreterException, 
IOException {
+    if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+      // Row Type hint is not supported in flink 1.10
+      return;
+    }
+    // define table function with TableHint of Row return type
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = flinkInterpreter.interpret(
+            "import org.apache.flink.table.annotation.FunctionHint\n" +
+                    "import org.apache.flink.table.functions.TableFunction\n" +
+                    "import org.apache.flink.types.Row\n" +
+                    "import org.apache.flink.api.scala._\n" +
+                    "import org.apache.flink.table.annotation.DataTypeHint\n" +
+                    "\n" +
+                    "@FunctionHint(output = new DataTypeHint(\"ROW<sum STRING, 
result INT>\"))\n" +
+                    "class OverloadedFunction extends TableFunction[Row] {\n" +
+                    "  def eval(a: Int, b: Int): Unit = {\n" +
+                    "    collect(Row.of(\"Sum\", Int.box(a + b)))\n" +
+                    "  }\n" +
+                    "}\n" +
+                    "\n" +
+                    "btenv.createTemporarySystemFunction(\"SumUdf\", new 
OverloadedFunction())", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "select * FROM LATERAL TABLE(SumUdf(1,2));", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, 
resultMessages.get(0).getType());
+    assertEquals("sum\tresult\nSum\t3\n", resultMessages.get(0).getData());
+  }
 }

Reply via email to