This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 80351f9 [ZEPPELIN-5272] Row return type FunctionHint doesn't work in
flink interpreter
80351f9 is described below
commit 80351f9ba6ac531b3d65069df6d2531bf4817226
Author: Jeff Zhang <[email protected]>
AuthorDate: Tue Mar 2 22:25:34 2021 +0800
[ZEPPELIN-5272] Row return type FunctionHint doesn't work in flink
interpreter
### What is this PR for?
The root cause is in flink side. And flink 1.11 fix this issue via
introducing a new api `createTemporarySystemFunction`. In this PR, I will this
method for `flink1.11-shim` and `flink1.12-shim`. Unit test is added as well.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5272
### How should this be tested?
* UT is added
### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? NO
Author: Jeff Zhang <[email protected]>
Closes #4065 from zjffdu/ZEPPELIN-5272 and squashes the following commits:
919055246 [Jeff Zhang] [ZEPPELIN-5272] Row return type FunctionHint doesn't
work in flink interpreter
(cherry picked from commit b29197e9d0e4d55da9132c6682f7a2647d96ca5d)
Signed-off-by: Jeff Zhang <[email protected]>
---
.../java/org/apache/zeppelin/flink/FlinkShims.java | 2 ++
.../org/apache/zeppelin/flink/Flink110Shims.java | 7 +++++
.../org/apache/zeppelin/flink/Flink111Shims.java | 6 ++++
.../org/apache/zeppelin/flink/Flink112Shims.java | 6 ++++
.../zeppelin/flink/FlinkScalaInterpreter.scala | 2 +-
.../flink/FlinkBatchSqlInterpreterTest.java | 34 ++++++++++++++++++++++
6 files changed, 56 insertions(+), 1 deletion(-)
diff --git
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 717ef3d..bbe718c 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -121,6 +121,8 @@ public abstract class FlinkShims {
public abstract Object toDataSet(Object btenv, Object table);
+ public abstract void registerScalarFunction(Object btenv, String name,
Object scalarFunction);
+
public abstract void registerTableFunction(Object btenv, String name, Object
tableFunction);
public abstract void registerAggregateFunction(Object btenv, String name,
Object aggregateFunction);
diff --git
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index ee551ea..4eaa905 100644
---
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -41,8 +41,10 @@ import
org.apache.flink.table.api.scala.BatchTableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkException;
@@ -195,6 +197,11 @@ public class Flink110Shims extends FlinkShims {
}
@Override
+ public void registerScalarFunction(Object btenv, String name, Object
scalarFunction) {
+ ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name,
(ScalarFunction) scalarFunction);
+ }
+
+ @Override
public void registerTableFunction(Object btenv, String name, Object
tableFunction) {
((StreamTableEnvironmentImpl)(btenv)).registerFunction(name,
(TableFunction) tableFunction);
}
diff --git
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index 9c26346..5cfd280 100644
---
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -247,6 +248,11 @@ public class Flink111Shims extends FlinkShims {
}
@Override
+ public void registerScalarFunction(Object btenv, String name, Object
scalarFunction) {
+ ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name,
(ScalarFunction) scalarFunction);
+ }
+
+ @Override
public void registerTableFunction(Object btenv, String name, Object
tableFunction) {
((StreamTableEnvironmentImpl) (btenv)).registerFunction(name,
(TableFunction) tableFunction);
}
diff --git
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index 95f457d..f5dcef9 100644
---
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -248,6 +249,11 @@ public class Flink112Shims extends FlinkShims {
}
@Override
+ public void registerScalarFunction(Object btenv, String name, Object
scalarFunction) {
+ ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name,
(ScalarFunction) scalarFunction);
+ }
+
+ @Override
public void registerTableFunction(Object btenv, String name, Object
tableFunction) {
((StreamTableEnvironmentImpl) (btenv)).registerFunction(name,
(TableFunction) tableFunction);
}
diff --git
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 572426b..4084f1f 100644
---
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -487,7 +487,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
val udf = c.newInstance()
if (udf.isInstanceOf[ScalarFunction]) {
val scalarUDF = udf.asInstanceOf[ScalarFunction]
- btenv.registerFunction(c.getSimpleName, scalarUDF)
+ flinkShims.registerScalarFunction(btenv, c.getSimpleName,
scalarUDF)
} else if (udf.isInstanceOf[TableFunction[_]]) {
val tableUDF = udf.asInstanceOf[TableFunction[_]]
flinkShims.registerTableFunction(btenv, c.getSimpleName,
tableUDF)
diff --git
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index b3c3ae4..6077ff5 100644
---
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -401,4 +401,38 @@ public class FlinkBatchSqlInterpreterTest extends
SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData());
}
+
+ @Test
+ public void testFunctionHintRowType() throws InterpreterException,
IOException {
+ if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+ // Row Type hint is not supported in flink 1.10
+ return;
+ }
+ // define table function with TableHint of Row return type
+ InterpreterContext context = getInterpreterContext();
+ InterpreterResult result = flinkInterpreter.interpret(
+ "import org.apache.flink.table.annotation.FunctionHint\n" +
+ "import org.apache.flink.table.functions.TableFunction\n" +
+ "import org.apache.flink.types.Row\n" +
+ "import org.apache.flink.api.scala._\n" +
+ "import org.apache.flink.table.annotation.DataTypeHint\n" +
+ "\n" +
+ "@FunctionHint(output = new DataTypeHint(\"ROW<sum STRING,
result INT>\"))\n" +
+ "class OverloadedFunction extends TableFunction[Row] {\n" +
+ " def eval(a: Int, b: Int): Unit = {\n" +
+ " collect(Row.of(\"Sum\", Int.box(a + b)))\n" +
+ " }\n" +
+ "}\n" +
+ "\n" +
+ "btenv.createTemporarySystemFunction(\"SumUdf\", new
OverloadedFunction())", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "select * FROM LATERAL TABLE(SumUdf(1,2));", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages =
context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.TABLE,
resultMessages.get(0).getType());
+ assertEquals("sum\tresult\nSum\t3\n", resultMessages.get(0).getData());
+ }
}