This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit af4d880779738dd2a4817489b0e91bf68c670da9
Author: Timo Walther <[email protected]>
AuthorDate: Tue Dec 17 10:46:13 2019 +0100

    [hotfix][table] Update function tests to use testing infrastructure
---
 .../table/planner/functions/FunctionTestBase.java  | 362 -----------------
 .../planner/runtime/batch/sql/FunctionITCase.java  |  39 --
 .../planner/runtime/stream/sql/FunctionITCase.java | 390 ++++++++++++++++--
 .../flink/table/functions/FunctionTestBase.java    | 436 ---------------------
 .../table/runtime/batch/sql/FunctionITCase.java    |  43 --
 .../table/runtime/stream/sql/FunctionITCase.java   | 431 +++++++++++++++++++-
 6 files changed, 767 insertions(+), 934 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java
deleted file mode 100644
index e788df9..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions;
-
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.types.Row;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link CatalogFunction}.
- */
-public abstract class FunctionTestBase {
-       protected static TableEnvironment tableEnv;
-
-       @Test
-       public void testCreateCatalogFunctionInDefaultCatalog() {
-               String ddl1 = "create function f1 as 
'org.apache.flink.function.TestFunction'";
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f1"));
-
-               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f1");
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f1"));
-       }
-
-       @Test
-       public void testCreateFunctionWithFullPath() {
-               String ddl1 = "create function 
default_catalog.default_database.f2 as" +
-                       " 'org.apache.flink.function.TestFunction'";
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f2"));
-
-               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f2");
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f2"));
-       }
-
-       @Test
-       public void testCreateFunctionWithoutCatalogIdentifier() {
-               String ddl1 = "create function default_database.f3 as" +
-                       " 'org.apache.flink.function.TestFunction'";
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f3"));
-
-               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f3");
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f3"));
-       }
-
-       @Test
-       public void testCreateFunctionCatalogNotExists() {
-
-               String ddl1 = "create function catalog1.database1.f3 as 
'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(ddl1);
-               } catch (Exception e){
-                       assertTrue(e.getMessage().equals("Catalog catalog1 does 
not exist"));
-               }
-       }
-
-       @Test
-       public void testCreateFunctionDBNotExists() {
-               String ddl1 = "create function default_catalog.database1.f3 as 
'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(ddl1);
-               } catch (Exception e){
-                       assertEquals(e.getMessage(), "Could not execute CREATE 
CATALOG FUNCTION:" +
-                               " (catalogFunction: [Optional[This is a 
user-defined function]], identifier:" +
-                               " [`default_catalog`.`database1`.`f3`], 
ignoreIfExists: [false])");
-               }
-       }
-
-       @Test
-       public void testCreateTemporaryCatalogFunction() {
-               String ddl1 = "create temporary function 
default_catalog.default_database.f4" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String ddl2 = "create temporary function if not exists 
default_catalog.default_database.f4" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String ddl3 = "drop temporary function 
default_catalog.default_database.f4";
-
-               String ddl4 = "drop temporary function if exists 
default_catalog.default_database.f4";
-
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
-
-               tableEnv.sqlUpdate(ddl2);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
-
-               tableEnv.sqlUpdate(ddl3);
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
-
-               tableEnv.sqlUpdate(ddl1);
-               try {
-                       tableEnv.sqlUpdate(ddl1);
-               } catch (Exception e) {
-                       assertTrue(e instanceof ValidationException);
-                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
-                               "`default_catalog`.`default_database`.`f4` is 
already defined");
-               }
-
-               tableEnv.sqlUpdate(ddl3);
-               tableEnv.sqlUpdate(ddl4);
-               try {
-                       tableEnv.sqlUpdate(ddl3);
-               } catch (Exception e) {
-                       assertTrue(e instanceof ValidationException);
-                       assertEquals(e.getMessage(),
-                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4` doesn't exist");
-               }
-       }
-
-       @Test
-       public void testCreateTemporarySystemFunction() {
-               String ddl1 = "create temporary system function 
default_catalog.default_database.f5" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String ddl2 = "create temporary system function if not exists 
default_catalog.default_database.f5" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String ddl3 = "drop temporary system function 
default_catalog.default_database.f5";
-
-               tableEnv.sqlUpdate(ddl1);
-               tableEnv.sqlUpdate(ddl2);
-               tableEnv.sqlUpdate(ddl3);
-       }
-
-       @Test
-       public void testAlterFunction() throws Exception {
-               String create = "create function f3 as 
'org.apache.flink.function.TestFunction'";
-               String alter = "alter function f3 as 
'org.apache.flink.function.TestFunction2'";
-
-               ObjectPath objectPath = new ObjectPath("default_database", 
"f3");
-               Catalog catalog = tableEnv.getCatalog("default_catalog").get();
-               tableEnv.sqlUpdate(create);
-               CatalogFunction beforeUpdate = catalog.getFunction(objectPath);
-               assertEquals("org.apache.flink.function.TestFunction", 
beforeUpdate.getClassName());
-
-               tableEnv.sqlUpdate(alter);
-               CatalogFunction afterUpdate = catalog.getFunction(objectPath);
-               assertEquals("org.apache.flink.function.TestFunction2", 
afterUpdate.getClassName());
-       }
-
-       @Test
-       public void testAlterFunctionNonExists() {
-               String alterUndefinedFunction = "ALTER FUNCTION 
default_catalog.default_database.f4" +
-                       " as 'org.apache.flink.function.TestFunction'";
-
-               String alterFunctionInWrongCatalog = "ALTER FUNCTION 
catalog1.default_database.f4 " +
-                       "as 'org.apache.flink.function.TestFunction'";
-
-               String alterFunctionInWrongDB = "ALTER FUNCTION 
default_catalog.db1.f4 " +
-                       "as 'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(alterUndefinedFunction);
-                       fail();
-               } catch (Exception e){
-                       assertEquals(e.getMessage(),
-                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(alterFunctionInWrongCatalog);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Catalog catalog1 does 
not exist"));
-               }
-
-               try {
-                       tableEnv.sqlUpdate(alterFunctionInWrongDB);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(),
-                               "Function db1.f4 does not exist in Catalog 
default_catalog.");
-               }
-       }
-
-       @Test
-       public void testAlterTemporaryCatalogFunction() {
-               String alterTemporary = "ALTER TEMPORARY FUNCTION 
default_catalog.default_database.f4" +
-                       " as 'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(alterTemporary);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Alter temporary 
catalog function is not supported"));
-               }
-
-       }
-
-       @Test
-       public void testAlterTemporarySystemFunction() {
-               String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION 
default_catalog.default_database.f4" +
-                       " as 'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(alterTemporary);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Alter temporary 
system function is not supported"));
-               }
-       }
-
-       @Test
-       public void testDropFunctionNonExists() {
-               String dropUndefinedFunction = "DROP FUNCTION 
default_catalog.default_database.f4";
-
-               String dropFunctionInWrongCatalog = "DROP FUNCTION 
catalog1.default_database.f4";
-
-               String dropFunctionInWrongDB = "DROP FUNCTION 
default_catalog.db1.f4";
-
-               try {
-                       tableEnv.sqlUpdate(dropUndefinedFunction);
-                       fail();
-               } catch (Exception e){
-                       assertEquals(e.getMessage(),
-                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongCatalog);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Catalog catalog1 does 
not exist"));
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongDB);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Function db1.f4 does not 
exist in Catalog default_catalog.");
-               }
-       }
-
-       @Test
-       public void testDropTemporaryFunctionNonExits() {
-               String dropUndefinedFunction = "DROP TEMPORARY FUNCTION 
default_catalog.default_database.f4";
-               String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION 
catalog1.default_database.f4";
-               String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION 
default_catalog.db1.f4";
-
-               try {
-                       tableEnv.sqlUpdate(dropUndefinedFunction);
-                       fail();
-               } catch (Exception e){
-                       assertEquals(e.getMessage(),
-                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4` doesn't exist");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongCatalog);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(),
-                               "Temporary catalog function 
`catalog1`.`default_database`.`f4` doesn't exist");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongDB);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Temporary catalog 
function `default_catalog`.`db1`.`f4` doesn't exist");
-               }
-       }
-
-       @Test
-       public void 
testCreateAlterDropTemporaryCatalogFunctionsWithDifferentIdentifier() {
-               String createNoCatalogDB = "create temporary function f4" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String dropNoCatalogDB = "drop temporary function f4";
-
-               tableEnv.sqlUpdate(createNoCatalogDB);
-               tableEnv.sqlUpdate(dropNoCatalogDB);
-
-               String createNonExistsCatalog = "create temporary function 
catalog1.default_database.f4" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String dropNonExistsCatalog = "drop temporary function 
catalog1.default_database.f4";
-
-               tableEnv.sqlUpdate(createNonExistsCatalog);
-               tableEnv.sqlUpdate(dropNonExistsCatalog);
-
-               String createNonExistsDB = "create temporary function 
default_catalog.db1.f4" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String dropNonExistsDB = "drop temporary function 
default_catalog.db1.f4";
-
-               tableEnv.sqlUpdate(createNonExistsDB);
-               tableEnv.sqlUpdate(dropNonExistsDB);
-       }
-
-       @Test
-       public void testDropTemporarySystemFunction() {
-               String ddl1 = "create temporary system function f5" +
-                       " as 
'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
-
-               String ddl2 = "drop temporary system function f5";
-
-               String ddl3 = "drop temporary system function if exists f5";
-
-               tableEnv.sqlUpdate(ddl1);
-               tableEnv.sqlUpdate(ddl2);
-               tableEnv.sqlUpdate(ddl3);
-
-               try {
-                       tableEnv.sqlUpdate(ddl2);
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Temporary system function 
f5 doesn't exist");
-               }
-       }
-
-       protected Row toRow(Object ... objects) {
-               Row row = new Row(objects.length);
-               for (int i = 0; i < objects.length; i++) {
-                       row.setField(i, objects[i]);
-               }
-
-               return row;
-       }
-
-       /**
-        * Test udf class.
-        */
-       public static class TestUDF extends ScalarFunction {
-
-               public Integer eval(Integer a, Integer b) {
-                       return a + b;
-               }
-       }
-}
-
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
deleted file mode 100644
index a43c36a..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.batch.sql;
-
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.planner.functions.FunctionTestBase;
-import org.apache.flink.table.planner.utils.TestingTableEnvironment;
-
-import org.junit.BeforeClass;
-
-/**
- * Tests for catalog and system functions in batch table environment.
- */
-public class
-FunctionITCase extends FunctionTestBase {
-
-       @BeforeClass
-       public static void setup() {
-               EnvironmentSettings environmentSettings =
-                       
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
-               tableEnv = TestingTableEnvironment.create(environmentSettings);
-       }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 8aa8d68..310647a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql;
 
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.functions.ScalarFunction;
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
-import org.apache.flink.table.planner.functions.FunctionTestBase;
-import org.apache.flink.table.planner.utils.TestingTableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -33,82 +35,388 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for catalog and system in stream table environment.
  */
-public class FunctionITCase extends FunctionTestBase {
+public class FunctionITCase extends StreamingTestBase {
 
-       @BeforeClass
-       public static void setup() {
-               EnvironmentSettings environmentSettings =
-                       
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
-               tableEnv = TestingTableEnvironment.create(environmentSettings);
+       private static final String TEST_FUNCTION = TestUDF.class.getName();
+
+       @Test
+       public void testCreateCatalogFunctionInDefaultCatalog() {
+               String ddl1 = "create function f1 as 
'org.apache.flink.function.TestFunction'";
+               tEnv().sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f1"));
+
+               tEnv().sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f1");
+               
assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f1"));
+       }
+
+       @Test
+       public void testCreateFunctionWithFullPath() {
+               String ddl1 = "create function 
default_catalog.default_database.f2 as" +
+                       " 'org.apache.flink.function.TestFunction'";
+               tEnv().sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f2"));
+
+               tEnv().sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f2");
+               
assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f2"));
+       }
+
+       @Test
+       public void testCreateFunctionWithoutCatalogIdentifier() {
+               String ddl1 = "create function default_database.f3 as" +
+                       " 'org.apache.flink.function.TestFunction'";
+               tEnv().sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f3"));
+
+               tEnv().sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f3");
+               
assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f3"));
+       }
+
+       @Test
+       public void testCreateFunctionCatalogNotExists() {
+               String ddl1 = "create function catalog1.database1.f3 as 
'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tEnv().sqlUpdate(ddl1);
+               } catch (Exception e){
+                       assertEquals("Catalog catalog1 does not exist", 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testCreateFunctionDBNotExists() {
+               String ddl1 = "create function default_catalog.database1.f3 as 
'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tEnv().sqlUpdate(ddl1);
+               } catch (Exception e){
+                       assertEquals(e.getMessage(), "Could not execute CREATE 
CATALOG FUNCTION:" +
+                               " (catalogFunction: [Optional[This is a 
user-defined function]], identifier:" +
+                               " [`default_catalog`.`database1`.`f3`], 
ignoreIfExists: [false])");
+               }
+       }
+
+       @Test
+       public void testCreateTemporaryCatalogFunction() {
+               String ddl1 = "create temporary function 
default_catalog.default_database.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl2 = "create temporary function if not exists 
default_catalog.default_database.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl3 = "drop temporary function 
default_catalog.default_database.f4";
+
+               String ddl4 = "drop temporary function if exists 
default_catalog.default_database.f4";
+
+               tEnv().sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f4"));
+
+               tEnv().sqlUpdate(ddl2);
+               
assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f4"));
+
+               tEnv().sqlUpdate(ddl3);
+               
assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f4"));
+
+               tEnv().sqlUpdate(ddl1);
+               try {
+                       tEnv().sqlUpdate(ddl1);
+               } catch (Exception e) {
+                       assertTrue(e instanceof ValidationException);
+                       assertEquals(e.getMessage(),
+                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4`" +
+                                       " is already defined");
+               }
+
+               tEnv().sqlUpdate(ddl3);
+               tEnv().sqlUpdate(ddl4);
+               try {
+                       tEnv().sqlUpdate(ddl3);
+               } catch (Exception e) {
+                       assertTrue(e instanceof ValidationException);
+                       assertEquals(e.getMessage(),
+                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4`" +
+                                       " doesn't exist");
+               }
+       }
+
+       @Test
+       public void testCreateTemporarySystemFunction() {
+               String ddl1 = "create temporary system function 
default_catalog.default_database.f5" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl2 = "create temporary system function if not exists 
default_catalog.default_database.f5" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl3 = "drop temporary system function 
default_catalog.default_database.f5";
+
+               tEnv().sqlUpdate(ddl1);
+               tEnv().sqlUpdate(ddl2);
+               tEnv().sqlUpdate(ddl3);
+       }
+
+       @Test
+       public void testAlterFunction() throws Exception {
+               String create = "create function f3 as 
'org.apache.flink.function.TestFunction'";
+               String alter = "alter function f3 as 
'org.apache.flink.function.TestFunction2'";
+
+               ObjectPath objectPath = new ObjectPath("default_database", 
"f3");
+               assertTrue(tEnv().getCatalog("default_catalog").isPresent());
+               Catalog catalog = tEnv().getCatalog("default_catalog").get();
+               tEnv().sqlUpdate(create);
+               CatalogFunction beforeUpdate = catalog.getFunction(objectPath);
+               assertEquals("org.apache.flink.function.TestFunction", 
beforeUpdate.getClassName());
+
+               tEnv().sqlUpdate(alter);
+               CatalogFunction afterUpdate = catalog.getFunction(objectPath);
+               assertEquals("org.apache.flink.function.TestFunction2", 
afterUpdate.getClassName());
+       }
+
+       @Test
+       public void testAlterFunctionNonExists() {
+               String alterUndefinedFunction = "ALTER FUNCTION 
default_catalog.default_database.f4" +
+                       " as 'org.apache.flink.function.TestFunction'";
+
+               String alterFunctionInWrongCatalog = "ALTER FUNCTION 
catalog1.default_database.f4 " +
+                       "as 'org.apache.flink.function.TestFunction'";
+
+               String alterFunctionInWrongDB = "ALTER FUNCTION 
default_catalog.db1.f4 " +
+                       "as 'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tEnv().sqlUpdate(alterUndefinedFunction);
+                       fail();
+               } catch (Exception e){
+                       assertEquals(e.getMessage(),
+                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
+               }
+
+               try {
+                       tEnv().sqlUpdate(alterFunctionInWrongCatalog);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Catalog catalog1 does not exist", 
e.getMessage());
+               }
+
+               try {
+                       tEnv().sqlUpdate(alterFunctionInWrongDB);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Function db1.f4 does not 
exist" +
+                               " in Catalog default_catalog.");
+               }
+       }
+
+       @Test
+       public void testAlterTemporaryCatalogFunction() {
+               String alterTemporary = "ALTER TEMPORARY FUNCTION 
default_catalog.default_database.f4" +
+                       " as 'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tEnv().sqlUpdate(alterTemporary);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Alter temporary catalog function is not 
supported", e.getMessage());
+               }
+       }
+
+       @Test
+       public void testAlterTemporarySystemFunction() {
+               String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION 
default_catalog.default_database.f4" +
+                       " as 'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tEnv().sqlUpdate(alterTemporary);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Alter temporary system function is not 
supported", e.getMessage());
+               }
+       }
+
+       @Test
+       public void testDropFunctionNonExists() {
+               String dropUndefinedFunction = "DROP FUNCTION 
default_catalog.default_database.f4";
+
+               String dropFunctionInWrongCatalog = "DROP FUNCTION 
catalog1.default_database.f4";
+
+               String dropFunctionInWrongDB = "DROP FUNCTION 
default_catalog.db1.f4";
+
+               try {
+                       tEnv().sqlUpdate(dropUndefinedFunction);
+                       fail();
+               } catch (Exception e){
+                       assertEquals(e.getMessage(),
+                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
+               }
+
+               try {
+                       tEnv().sqlUpdate(dropFunctionInWrongCatalog);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Catalog catalog1 does not exist", 
e.getMessage());
+               }
+
+               try {
+                       tEnv().sqlUpdate(dropFunctionInWrongDB);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(),
+                               "Function db1.f4 does not exist in Catalog 
default_catalog.");
+               }
+       }
+
+       @Test
+       public void testDropTemporaryFunctionNonExits() {
+               String dropUndefinedFunction = "DROP TEMPORARY FUNCTION 
default_catalog.default_database.f4";
+               String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION 
catalog1.default_database.f4";
+               String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION 
default_catalog.db1.f4";
+
+               try {
+                       tEnv().sqlUpdate(dropUndefinedFunction);
+                       fail();
+               } catch (Exception e){
+                       assertEquals(e.getMessage(), "Temporary catalog 
function" +
+                               " `default_catalog`.`default_database`.`f4` 
doesn't exist");
+               }
+
+               try {
+                       tEnv().sqlUpdate(dropFunctionInWrongCatalog);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
+                               "`catalog1`.`default_database`.`f4` doesn't 
exist");
+               }
+
+               try {
+                       tEnv().sqlUpdate(dropFunctionInWrongDB);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
+                               "`default_catalog`.`db1`.`f4` doesn't exist");
+               }
        }
 
        @Test
-       public void testUseDefinedRegularCatalogFunction() throws Exception {
-               String functionDDL = "create function addOne as " +
-                       
"'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
+       public void 
testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() {
+               String createNoCatalogDB = "create temporary function f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String dropNoCatalogDB = "drop temporary function f4";
+
+               tEnv().sqlUpdate(createNoCatalogDB);
+               tEnv().sqlUpdate(dropNoCatalogDB);
+
+               String createNonExistsCatalog = "create temporary function 
catalog1.default_database.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String dropNonExistsCatalog = "drop temporary function 
catalog1.default_database.f4";
+
+               tEnv().sqlUpdate(createNonExistsCatalog);
+               tEnv().sqlUpdate(dropNonExistsCatalog);
+
+               String createNonExistsDB = "create temporary function 
default_catalog.db1.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String dropNonExistsDB = "drop temporary function 
default_catalog.db1.f4";
+
+               tEnv().sqlUpdate(createNonExistsDB);
+               tEnv().sqlUpdate(dropNonExistsDB);
+       }
+
+       @Test
+       public void testDropTemporarySystemFunction() {
+               String ddl1 = "create temporary system function f5 as '" + 
TEST_FUNCTION + "'";
+
+               String ddl2 = "drop temporary system function f5";
+
+               String ddl3 = "drop temporary system function if exists f5";
+
+               tEnv().sqlUpdate(ddl1);
+               tEnv().sqlUpdate(ddl2);
+               tEnv().sqlUpdate(ddl3);
+
+               try {
+                       tEnv().sqlUpdate(ddl2);
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Temporary system function 
f5 doesn't exist");
+               }
+       }
+
+       @Test
+       public void testUserDefinedRegularCatalogFunction() throws Exception {
+               String functionDDL = "create function addOne as '" + 
TEST_FUNCTION + "'";
 
                String dropFunctionDDL = "drop function addOne";
-               testUseDefinedCatalogFunction(functionDDL);
+               testUserDefinedCatalogFunction(functionDDL);
                // delete the function
-               tableEnv.sqlUpdate(dropFunctionDDL);
+               tEnv().sqlUpdate(dropFunctionDDL);
        }
 
        @Test
-       public void testUseDefinedTemporaryCatalogFunction() throws Exception {
-               String functionDDL = "create temporary function addOne as " +
-                       
"'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
+       public void testUserDefinedTemporaryCatalogFunction() throws Exception {
+               String functionDDL = "create temporary function addOne as '" + 
TEST_FUNCTION + "'";
 
                String dropFunctionDDL = "drop temporary function addOne";
-               testUseDefinedCatalogFunction(functionDDL);
+               testUserDefinedCatalogFunction(functionDDL);
                // delete the function
-               tableEnv.sqlUpdate(dropFunctionDDL);
+               tEnv().sqlUpdate(dropFunctionDDL);
        }
 
        @Test
-       public void testUseDefinedTemporarySystemFunction() throws Exception {
-               String functionDDL = "create temporary system function addOne 
as " +
-                       
"'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'";
+       public void testUserDefinedTemporarySystemFunction() throws Exception {
+               String functionDDL = "create temporary system function addOne 
as '" + TEST_FUNCTION + "'";
 
                String dropFunctionDDL = "drop temporary system function 
addOne";
-               testUseDefinedCatalogFunction(functionDDL);
+               testUserDefinedCatalogFunction(functionDDL);
                // delete the function
-               tableEnv.sqlUpdate(dropFunctionDDL);
+               tEnv().sqlUpdate(dropFunctionDDL);
+       }
+
+       /**
+        * Test udf class.
+        */
+       public static class TestUDF extends ScalarFunction {
+
+               public Integer eval(Integer a, Integer b) {
+                       return a + b;
+               }
        }
 
-       // This test case only works for stream mode
-       private void testUseDefinedCatalogFunction(String createFunctionDDL) 
throws Exception {
+       private void testUserDefinedCatalogFunction(String createFunctionDDL) 
throws Exception {
                List<Row> sourceData = Arrays.asList(
-                       toRow(1, "1000", 2),
-                       toRow(2, "1", 3),
-                       toRow(3, "2000", 4),
-                       toRow(1, "2", 2),
-                       toRow(2, "3000", 3)
+                       Row.of(1, "1000", 2),
+                       Row.of(2, "1", 3),
+                       Row.of(3, "2000", 4),
+                       Row.of(1, "2", 2),
+                       Row.of(2, "3000", 3)
                );
 
                TestCollectionTableFactory.reset();
-               TestCollectionTableFactory.initData(sourceData, new 
ArrayList<Row>(), -1);
+               TestCollectionTableFactory.initData(sourceData, new 
ArrayList<>(), -1);
 
                String sourceDDL = "create table t1(a int, b varchar, c int) 
with ('connector' = 'COLLECTION')";
                String sinkDDL = "create table t2(a int, b varchar, c int) with 
('connector' = 'COLLECTION')";
 
                String query = "select t1.a, t1.b, addOne(t1.a, 1) as c from 
t1";
 
-               tableEnv.sqlUpdate(sourceDDL);
-               tableEnv.sqlUpdate(sinkDDL);
-               tableEnv.sqlUpdate(createFunctionDDL);
-               Table t2 = tableEnv.sqlQuery(query);
-               tableEnv.insertInto("t2", t2);
-               tableEnv.execute("job1");
+               tEnv().sqlUpdate(sourceDDL);
+               tEnv().sqlUpdate(sinkDDL);
+               tEnv().sqlUpdate(createFunctionDDL);
+               Table t2 = tEnv().sqlQuery(query);
+               tEnv().insertInto("t2", t2);
+               tEnv().execute("job1");
 
                Row[] result = TestCollectionTableFactory.RESULT().toArray(new 
Row[0]);
                Row[] expected = sourceData.toArray(new Row[0]);
                assertArrayEquals(expected, result);
 
-               tableEnv.sqlUpdate("drop table t1");
-               tableEnv.sqlUpdate("drop table t2");
+               tEnv().sqlUpdate("drop table t1");
+               tEnv().sqlUpdate("drop table t2");
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java
deleted file mode 100644
index 5815da6..0000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions;
-
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.factories.utils.TestCollectionTableFactory;
-import org.apache.flink.types.Row;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for both catalog and system function.
- */
-public abstract class FunctionTestBase {
-       private static TableEnvironment tableEnv;
-
-       public static void setTableEnv(TableEnvironment e) {
-               tableEnv = e;
-       }
-
-       public abstract void execute() throws Exception;
-
-       @Test
-       public void testCreateCatalogFunctionInDefaultCatalog() {
-               String ddl1 = "create function f1 as 
'org.apache.flink.function.TestFunction'";
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f1"));
-
-               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f1");
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f1"));
-       }
-
-       @Test
-       public void testCreateFunctionWithFullPath() {
-               String ddl1 = "create function 
default_catalog.default_database.f2 as" +
-                       " 'org.apache.flink.function.TestFunction'";
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f2"));
-
-               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f2");
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f2"));
-       }
-
-       @Test
-       public void testCreateFunctionWithoutCatalogIdentifier() {
-               String ddl1 = "create function default_database.f3 as" +
-                       " 'org.apache.flink.function.TestFunction'";
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f3"));
-
-               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f3");
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f3"));
-       }
-
-       @Test
-       public void testCreateFunctionCatalogNotExists() {
-
-               String ddl1 = "create function catalog1.database1.f3 as 
'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(ddl1);
-               } catch (Exception e){
-                       assertTrue(e.getMessage().equals("Catalog catalog1 does 
not exist"));
-               }
-       }
-
-       @Test
-       public void testCreateFunctionDBNotExists() {
-               String ddl1 = "create function default_catalog.database1.f3 as 
'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(ddl1);
-               } catch (Exception e){
-                       assertEquals(e.getMessage(), "Could not execute CREATE 
CATALOG FUNCTION:" +
-                               " (catalogFunction: [Optional[This is a 
user-defined function]], identifier:" +
-                               " [`default_catalog`.`database1`.`f3`], 
ignoreIfExists: [false])");
-               }
-       }
-
-       @Test
-       public void testCreateTemporaryCatalogFunction() {
-               String ddl1 = "create temporary function 
default_catalog.default_database.f4" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String ddl2 = "create temporary function if not exists 
default_catalog.default_database.f4" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String ddl3 = "drop temporary function 
default_catalog.default_database.f4";
-
-               String ddl4 = "drop temporary function if exists 
default_catalog.default_database.f4";
-
-               tableEnv.sqlUpdate(ddl1);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
-
-               tableEnv.sqlUpdate(ddl2);
-               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
-
-               tableEnv.sqlUpdate(ddl3);
-               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
-
-               tableEnv.sqlUpdate(ddl1);
-               try {
-                       tableEnv.sqlUpdate(ddl1);
-               } catch (Exception e) {
-                       assertTrue(e instanceof ValidationException);
-                       assertEquals(e.getMessage(),
-                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4`" +
-                                       " is already defined");
-               }
-
-               tableEnv.sqlUpdate(ddl3);
-               tableEnv.sqlUpdate(ddl4);
-               try {
-                       tableEnv.sqlUpdate(ddl3);
-               } catch (Exception e) {
-                       assertTrue(e instanceof ValidationException);
-                       assertEquals(e.getMessage(),
-                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4`" +
-                                       " doesn't exist");
-               }
-       }
-
-       @Test
-       public void testCreateTemporarySystemFunction() {
-               String ddl1 = "create temporary system function 
default_catalog.default_database.f5" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String ddl2 = "create temporary system function if not exists 
default_catalog.default_database.f5" +
-                       " as 
'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'";
-
-               String ddl3 = "drop temporary system function 
default_catalog.default_database.f5";
-
-               tableEnv.sqlUpdate(ddl1);
-               tableEnv.sqlUpdate(ddl2);
-               tableEnv.sqlUpdate(ddl3);
-       }
-
-       @Test
-       public void testAlterFunction() throws Exception {
-               String create = "create function f3 as 
'org.apache.flink.function.TestFunction'";
-               String alter = "alter function f3 as 
'org.apache.flink.function.TestFunction2'";
-
-               ObjectPath objectPath = new ObjectPath("default_database", 
"f3");
-               Catalog catalog = tableEnv.getCatalog("default_catalog").get();
-               tableEnv.sqlUpdate(create);
-               CatalogFunction beforeUpdate = catalog.getFunction(objectPath);
-               assertEquals("org.apache.flink.function.TestFunction", 
beforeUpdate.getClassName());
-
-               tableEnv.sqlUpdate(alter);
-               CatalogFunction afterUpdate = catalog.getFunction(objectPath);
-               assertEquals("org.apache.flink.function.TestFunction2", 
afterUpdate.getClassName());
-       }
-
-       @Test
-       public void testAlterFunctionNonExists() {
-               String alterUndefinedFunction = "ALTER FUNCTION 
default_catalog.default_database.f4" +
-                       " as 'org.apache.flink.function.TestFunction'";
-
-               String alterFunctionInWrongCatalog = "ALTER FUNCTION 
catalog1.default_database.f4 " +
-                       "as 'org.apache.flink.function.TestFunction'";
-
-               String alterFunctionInWrongDB = "ALTER FUNCTION 
default_catalog.db1.f4 " +
-                       "as 'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(alterUndefinedFunction);
-                       fail();
-               } catch (Exception e){
-                       assertEquals(e.getMessage(),
-                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(alterFunctionInWrongCatalog);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Catalog catalog1 does 
not exist"));
-               }
-
-               try {
-                       tableEnv.sqlUpdate(alterFunctionInWrongDB);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Function db1.f4 does not 
exist" +
-                               " in Catalog default_catalog.");
-               }
-       }
-
-       @Test
-       public void testAlterTemporaryCatalogFunction() {
-               String alterTemporary = "ALTER TEMPORARY FUNCTION 
default_catalog.default_database.f4" +
-                       " as 'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(alterTemporary);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Alter temporary 
catalog function is not supported"));
-               }
-       }
-
-       @Test
-       public void testAlterTemporarySystemFunction() {
-               String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION 
default_catalog.default_database.f4" +
-                       " as 'org.apache.flink.function.TestFunction'";
-
-               try {
-                       tableEnv.sqlUpdate(alterTemporary);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Alter temporary 
system function is not supported"));
-               }
-       }
-
-       @Test
-       public void testDropFunctionNonExists() {
-               String dropUndefinedFunction = "DROP FUNCTION 
default_catalog.default_database.f4";
-
-               String dropFunctionInWrongCatalog = "DROP FUNCTION 
catalog1.default_database.f4";
-
-               String dropFunctionInWrongDB = "DROP FUNCTION 
default_catalog.db1.f4";
-
-               try {
-                       tableEnv.sqlUpdate(dropUndefinedFunction);
-                       fail();
-               } catch (Exception e){
-                       assertEquals(e.getMessage(),
-                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongCatalog);
-                       fail();
-               } catch (Exception e) {
-                       assertTrue(e.getMessage().equals("Catalog catalog1 does 
not exist"));
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongDB);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(),
-                               "Function db1.f4 does not exist in Catalog 
default_catalog.");
-               }
-       }
-
-       @Test
-       public void testDropTemporaryFunctionNonExits() {
-               String dropUndefinedFunction = "DROP TEMPORARY FUNCTION 
default_catalog.default_database.f4";
-               String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION 
catalog1.default_database.f4";
-               String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION 
default_catalog.db1.f4";
-
-               try {
-                       tableEnv.sqlUpdate(dropUndefinedFunction);
-                       fail();
-               } catch (Exception e){
-                       assertEquals(e.getMessage(), "Temporary catalog 
function" +
-                               " `default_catalog`.`default_database`.`f4` 
doesn't exist");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongCatalog);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
-                               "`catalog1`.`default_database`.`f4` doesn't 
exist");
-               }
-
-               try {
-                       tableEnv.sqlUpdate(dropFunctionInWrongDB);
-                       fail();
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
-                               "`default_catalog`.`db1`.`f4` doesn't exist");
-               }
-       }
-
-       @Test
-       public void 
testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() {
-               String createNoCatalogDB = "create temporary function f4" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String dropNoCatalogDB = "drop temporary function f4";
-
-               tableEnv.sqlUpdate(createNoCatalogDB);
-               tableEnv.sqlUpdate(dropNoCatalogDB);
-
-               String createNonExistsCatalog = "create temporary function 
catalog1.default_database.f4" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String dropNonExistsCatalog = "drop temporary function 
catalog1.default_database.f4";
-
-               tableEnv.sqlUpdate(createNonExistsCatalog);
-               tableEnv.sqlUpdate(dropNonExistsCatalog);
-
-               String createNonExistsDB = "create temporary function 
default_catalog.db1.f4" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String dropNonExistsDB = "drop temporary function 
default_catalog.db1.f4";
-
-               tableEnv.sqlUpdate(createNonExistsDB);
-               tableEnv.sqlUpdate(dropNonExistsDB);
-       }
-
-       @Test
-       public void testDropTemporarySystemFunction() {
-               String ddl1 = "create temporary system function f5" +
-                       " as 
'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String ddl2 = "drop temporary system function f5";
-
-               String ddl3 = "drop temporary system function if exists f5";
-
-               tableEnv.sqlUpdate(ddl1);
-               tableEnv.sqlUpdate(ddl2);
-               tableEnv.sqlUpdate(ddl3);
-
-               try {
-                       tableEnv.sqlUpdate(ddl2);
-               } catch (Exception e) {
-                       assertEquals(e.getMessage(), "Temporary system function 
f5 doesn't exist");
-               }
-       }
-
-       @Test
-       public void testUseDefinedRegularCatalogFunction() throws Exception {
-               String functionDDL = "create function addOne as " +
-                       
"'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String dropFunctionDDL = "drop function addOne";
-               testUseDefinedCatalogFunction(functionDDL);
-               // delete the function
-               tableEnv.sqlUpdate(dropFunctionDDL);
-       }
-
-       @Test
-       public void testUseDefinedTemporaryCatalogFunction() throws Exception {
-               String functionDDL = "create temporary function addOne as " +
-                       
"'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String dropFunctionDDL = "drop temporary function addOne";
-               testUseDefinedCatalogFunction(functionDDL);
-               // delete the function
-               tableEnv.sqlUpdate(dropFunctionDDL);
-       }
-
-       @Test
-       public void testUseDefinedTemporarySystemFunction() throws Exception {
-               String functionDDL = "create temporary system function addOne 
as " +
-                       
"'org.apache.flink.table.functions.FunctionTestBase$TestUDF'";
-
-               String dropFunctionDDL = "drop temporary system function 
addOne";
-               testUseDefinedCatalogFunction(functionDDL);
-               // delete the function
-               tableEnv.sqlUpdate(dropFunctionDDL);
-       }
-
-       private void testUseDefinedCatalogFunction(String createFunctionDDL) 
throws Exception {
-               List<Row> sourceData = Arrays.asList(
-                       toRow(1, "1000", 2),
-                       toRow(2, "1", 3),
-                       toRow(3, "2000", 4),
-                       toRow(1, "2", 2),
-                       toRow(2, "3000", 3)
-               );
-
-               TestCollectionTableFactory.reset();
-               TestCollectionTableFactory.initData(sourceData, new 
ArrayList<Row>(), -1);
-
-               String sourceDDL = "create table t1(a int, b varchar, c int) 
with ('connector' = 'COLLECTION')";
-               String sinkDDL = "create table t2(a int, b varchar, c int) with 
('connector' = 'COLLECTION')";
-               String query = " insert into t2 select t1.a, t1.b, addOne(t1.a, 
1) as c from t1";
-
-               tableEnv.sqlUpdate(sourceDDL);
-               tableEnv.sqlUpdate(sinkDDL);
-               tableEnv.sqlUpdate(createFunctionDDL);
-               tableEnv.sqlUpdate(query);
-               execute();
-
-               Row[] result = TestCollectionTableFactory.RESULT().toArray(new 
Row[0]);
-               Row[] expected = sourceData.toArray(new Row[0]);
-               assertArrayEquals(expected, result);
-
-               tableEnv.sqlUpdate("drop table t1");
-               tableEnv.sqlUpdate("drop table t2");
-       }
-
-       private Row toRow(Object ... objects) {
-               Row row = new Row(objects.length);
-               for (int i = 0; i < objects.length; i++) {
-                       row.setField(i, objects[i]);
-               }
-
-               return row;
-       }
-
-       /**
-        * Test udf class.
-        */
-       public static class TestUDF extends ScalarFunction {
-
-               public Integer eval(Integer a, Integer b) {
-                       return a + b;
-               }
-       }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java
deleted file mode 100644
index 7e2a10f..0000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.batch.sql;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.functions.FunctionTestBase;
-
-import org.junit.BeforeClass;
-
-/**
- * Tests for catalog and system function in batch table environment.
- */
-public class FunctionITCase extends FunctionTestBase {
-       private static ExecutionEnvironment executionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment();
-
-       @BeforeClass
-       public static void setup() {
-               BatchTableEnvironment batchTableEnvironment = 
BatchTableEnvironment.create(executionEnvironment);
-               setTableEnv(batchTableEnvironment);
-       }
-
-       @Override
-       public void execute() throws Exception {
-               executionEnvironment.execute();
-       }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
index 045c4b0..6ccfc02 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
@@ -19,26 +19,431 @@
 package org.apache.flink.table.runtime.stream.sql;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.FunctionTestBase;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
 
-import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
- * Tests for catalog and system function in stream table environment.
+ * Tests for both catalog and system function.
  */
-public class FunctionITCase extends FunctionTestBase {
-       private static StreamExecutionEnvironment streamExecEnvironment;
+public class FunctionITCase extends AbstractTestBase {
+
+       private static final String TEST_FUNCTION = TestUDF.class.getName();
+
+       @Test
+       public void testCreateCatalogFunctionInDefaultCatalog() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create function f1 as 
'org.apache.flink.function.TestFunction'";
+               tableEnv.sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f1"));
+
+               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f1");
+               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f1"));
+       }
+
+       @Test
+       public void testCreateFunctionWithFullPath() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create function 
default_catalog.default_database.f2 as" +
+                       " 'org.apache.flink.function.TestFunction'";
+               tableEnv.sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f2"));
+
+               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f2");
+               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f2"));
+       }
+
+       @Test
+       public void testCreateFunctionWithoutCatalogIdentifier() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create function default_database.f3 as" +
+                       " 'org.apache.flink.function.TestFunction'";
+               tableEnv.sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f3"));
+
+               tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS 
default_catalog.default_database.f3");
+               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f3"));
+       }
+
+       @Test
+       public void testCreateFunctionCatalogNotExists() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create function catalog1.database1.f3 as 
'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tableEnv.sqlUpdate(ddl1);
+               } catch (Exception e){
+                       assertEquals("Catalog catalog1 does not exist", 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testCreateFunctionDBNotExists() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create function default_catalog.database1.f3 as 
'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tableEnv.sqlUpdate(ddl1);
+               } catch (Exception e){
+                       assertEquals(e.getMessage(), "Could not execute CREATE 
CATALOG FUNCTION:" +
+                               " (catalogFunction: [Optional[This is a 
user-defined function]], identifier:" +
+                               " [`default_catalog`.`database1`.`f3`], 
ignoreIfExists: [false])");
+               }
+       }
+
+       @Test
+       public void testCreateTemporaryCatalogFunction() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create temporary function 
default_catalog.default_database.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl2 = "create temporary function if not exists 
default_catalog.default_database.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl3 = "drop temporary function 
default_catalog.default_database.f4";
+
+               String ddl4 = "drop temporary function if exists 
default_catalog.default_database.f4";
+
+               tableEnv.sqlUpdate(ddl1);
+               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
+
+               tableEnv.sqlUpdate(ddl2);
+               
assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
+
+               tableEnv.sqlUpdate(ddl3);
+               
assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f4"));
+
+               tableEnv.sqlUpdate(ddl1);
+               try {
+                       tableEnv.sqlUpdate(ddl1);
+               } catch (Exception e) {
+                       assertTrue(e instanceof ValidationException);
+                       assertEquals(e.getMessage(),
+                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4`" +
+                                       " is already defined");
+               }
+
+               tableEnv.sqlUpdate(ddl3);
+               tableEnv.sqlUpdate(ddl4);
+               try {
+                       tableEnv.sqlUpdate(ddl3);
+               } catch (Exception e) {
+                       assertTrue(e instanceof ValidationException);
+                       assertEquals(e.getMessage(),
+                               "Temporary catalog function 
`default_catalog`.`default_database`.`f4`" +
+                                       " doesn't exist");
+               }
+       }
+
+       @Test
+       public void testCreateTemporarySystemFunction() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create temporary system function 
default_catalog.default_database.f5" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl2 = "create temporary system function if not exists 
default_catalog.default_database.f5" +
+                       " as 
'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'";
+
+               String ddl3 = "drop temporary system function 
default_catalog.default_database.f5";
+
+               tableEnv.sqlUpdate(ddl1);
+               tableEnv.sqlUpdate(ddl2);
+               tableEnv.sqlUpdate(ddl3);
+       }
+
+       @Test
+       public void testAlterFunction() throws Exception {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String create = "create function f3 as 
'org.apache.flink.function.TestFunction'";
+               String alter = "alter function f3 as 
'org.apache.flink.function.TestFunction2'";
+
+               ObjectPath objectPath = new ObjectPath("default_database", 
"f3");
+               assertTrue(tableEnv.getCatalog("default_catalog").isPresent());
+               Catalog catalog = tableEnv.getCatalog("default_catalog").get();
+               tableEnv.sqlUpdate(create);
+               CatalogFunction beforeUpdate = catalog.getFunction(objectPath);
+               assertEquals("org.apache.flink.function.TestFunction", 
beforeUpdate.getClassName());
+
+               tableEnv.sqlUpdate(alter);
+               CatalogFunction afterUpdate = catalog.getFunction(objectPath);
+               assertEquals("org.apache.flink.function.TestFunction2", 
afterUpdate.getClassName());
+       }
+
+       @Test
+       public void testAlterFunctionNonExists() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String alterUndefinedFunction = "ALTER FUNCTION 
default_catalog.default_database.f4" +
+                       " as 'org.apache.flink.function.TestFunction'";
+
+               String alterFunctionInWrongCatalog = "ALTER FUNCTION 
catalog1.default_database.f4 " +
+                       "as 'org.apache.flink.function.TestFunction'";
+
+               String alterFunctionInWrongDB = "ALTER FUNCTION 
default_catalog.db1.f4 " +
+                       "as 'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tableEnv.sqlUpdate(alterUndefinedFunction);
+                       fail();
+               } catch (Exception e){
+                       assertEquals(e.getMessage(),
+                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
+               }
+
+               try {
+                       tableEnv.sqlUpdate(alterFunctionInWrongCatalog);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Catalog catalog1 does not exist", 
e.getMessage());
+               }
+
+               try {
+                       tableEnv.sqlUpdate(alterFunctionInWrongDB);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Function db1.f4 does not 
exist" +
+                               " in Catalog default_catalog.");
+               }
+       }
+
+       @Test
+       public void testAlterTemporaryCatalogFunction() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String alterTemporary = "ALTER TEMPORARY FUNCTION 
default_catalog.default_database.f4" +
+                       " as 'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tableEnv.sqlUpdate(alterTemporary);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Alter temporary catalog function is not 
supported", e.getMessage());
+               }
+       }
+
+       @Test
+       public void testAlterTemporarySystemFunction() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION 
default_catalog.default_database.f4" +
+                       " as 'org.apache.flink.function.TestFunction'";
+
+               try {
+                       tableEnv.sqlUpdate(alterTemporary);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Alter temporary system function is not 
supported", e.getMessage());
+               }
+       }
+
+       @Test
+       public void testDropFunctionNonExists() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String dropUndefinedFunction = "DROP FUNCTION 
default_catalog.default_database.f4";
+
+               String dropFunctionInWrongCatalog = "DROP FUNCTION 
catalog1.default_database.f4";
+
+               String dropFunctionInWrongDB = "DROP FUNCTION 
default_catalog.db1.f4";
+
+               try {
+                       tableEnv.sqlUpdate(dropUndefinedFunction);
+                       fail();
+               } catch (Exception e){
+                       assertEquals(e.getMessage(),
+                               "Function default_database.f4 does not exist in 
Catalog default_catalog.");
+               }
+
+               try {
+                       tableEnv.sqlUpdate(dropFunctionInWrongCatalog);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals("Catalog catalog1 does not exist", 
e.getMessage());
+               }
+
+               try {
+                       tableEnv.sqlUpdate(dropFunctionInWrongDB);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(),
+                               "Function db1.f4 does not exist in Catalog 
default_catalog.");
+               }
+       }
+
+       @Test
+       public void testDropTemporaryFunctionNonExits() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String dropUndefinedFunction = "DROP TEMPORARY FUNCTION 
default_catalog.default_database.f4";
+               String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION 
catalog1.default_database.f4";
+               String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION 
default_catalog.db1.f4";
+
+               try {
+                       tableEnv.sqlUpdate(dropUndefinedFunction);
+                       fail();
+               } catch (Exception e){
+                       assertEquals(e.getMessage(), "Temporary catalog 
function" +
+                               " `default_catalog`.`default_database`.`f4` 
doesn't exist");
+               }
+
+               try {
+                       tableEnv.sqlUpdate(dropFunctionInWrongCatalog);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
+                               "`catalog1`.`default_database`.`f4` doesn't 
exist");
+               }
+
+               try {
+                       tableEnv.sqlUpdate(dropFunctionInWrongDB);
+                       fail();
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Temporary catalog 
function " +
+                               "`default_catalog`.`db1`.`f4` doesn't exist");
+               }
+       }
+
+       @Test
+       public void 
testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String createNoCatalogDB = "create temporary function f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String dropNoCatalogDB = "drop temporary function f4";
+
+               tableEnv.sqlUpdate(createNoCatalogDB);
+               tableEnv.sqlUpdate(dropNoCatalogDB);
+
+               String createNonExistsCatalog = "create temporary function 
catalog1.default_database.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String dropNonExistsCatalog = "drop temporary function 
catalog1.default_database.f4";
+
+               tableEnv.sqlUpdate(createNonExistsCatalog);
+               tableEnv.sqlUpdate(dropNonExistsCatalog);
+
+               String createNonExistsDB = "create temporary function 
default_catalog.db1.f4" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String dropNonExistsDB = "drop temporary function 
default_catalog.db1.f4";
+
+               tableEnv.sqlUpdate(createNonExistsDB);
+               tableEnv.sqlUpdate(dropNonExistsDB);
+       }
+
+       @Test
+       public void testDropTemporarySystemFunction() {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String ddl1 = "create temporary system function f5" +
+                       " as '" + TEST_FUNCTION + "'";
+
+               String ddl2 = "drop temporary system function f5";
+
+               String ddl3 = "drop temporary system function if exists f5";
+
+               tableEnv.sqlUpdate(ddl1);
+               tableEnv.sqlUpdate(ddl2);
+               tableEnv.sqlUpdate(ddl3);
+
+               try {
+                       tableEnv.sqlUpdate(ddl2);
+               } catch (Exception e) {
+                       assertEquals(e.getMessage(), "Temporary system function 
f5 doesn't exist");
+               }
+       }
+
+       @Test
+       public void testUserDefinedRegularCatalogFunction() throws Exception {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String functionDDL = "create function addOne as " +
+                       "'" + TEST_FUNCTION + "'";
+
+               String dropFunctionDDL = "drop function addOne";
+               testUserDefinedCatalogFunction(tableEnv, functionDDL);
+               // delete the function
+               tableEnv.sqlUpdate(dropFunctionDDL);
+       }
+
+       @Test
+       public void testUserDefinedTemporaryCatalogFunction() throws Exception {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String functionDDL = "create temporary function addOne as " +
+                       "'" + TEST_FUNCTION + "'";
+
+               String dropFunctionDDL = "drop temporary function addOne";
+               testUserDefinedCatalogFunction(tableEnv, functionDDL);
+               // delete the function
+               tableEnv.sqlUpdate(dropFunctionDDL);
+       }
+
+       @Test
+       public void testUserDefinedTemporarySystemFunction() throws Exception {
+               TableEnvironment tableEnv = getTableEnvironment();
+               String functionDDL = "create temporary system function addOne 
as " +
+                       "'" + TEST_FUNCTION + "'";
+
+               String dropFunctionDDL = "drop temporary system function 
addOne";
+               testUserDefinedCatalogFunction(tableEnv, functionDDL);
+               // delete the function
+               tableEnv.sqlUpdate(dropFunctionDDL);
+       }
+
+       /**
+        * Test udf class.
+        */
+       public static class TestUDF extends ScalarFunction {
+
+               public Integer eval(Integer a, Integer b) {
+                       return a + b;
+               }
+       }
+
+       private void testUserDefinedCatalogFunction(TableEnvironment tableEnv, 
String createFunctionDDL) throws Exception {
+               List<Row> sourceData = Arrays.asList(
+                       Row.of(1, "1000", 2),
+                       Row.of(2, "1", 3),
+                       Row.of(3, "2000", 4),
+                       Row.of(1, "2", 2),
+                       Row.of(2, "3000", 3)
+               );
+
+               TestCollectionTableFactory.reset();
+               TestCollectionTableFactory.initData(sourceData, new 
ArrayList<>(), -1);
+
+               String sourceDDL = "create table t1(a int, b varchar, c int) 
with ('connector' = 'COLLECTION')";
+               String sinkDDL = "create table t2(a int, b varchar, c int) with 
('connector' = 'COLLECTION')";
+               String query = " insert into t2 select t1.a, t1.b, addOne(t1.a, 
1) as c from t1";
+
+               tableEnv.sqlUpdate(sourceDDL);
+               tableEnv.sqlUpdate(sinkDDL);
+               tableEnv.sqlUpdate(createFunctionDDL);
+               tableEnv.sqlUpdate(query);
+               tableEnv.execute("Test job");
+
+               Row[] result = TestCollectionTableFactory.RESULT().toArray(new 
Row[0]);
+               Row[] expected = sourceData.toArray(new Row[0]);
+               assertArrayEquals(expected, result);
 
-       @BeforeClass
-       public static void setup() {
-               streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
-               setTableEnv(streamTableEnvironment);
+               tableEnv.sqlUpdate("drop table t1");
+               tableEnv.sqlUpdate("drop table t2");
        }
 
-       @Override
-       public void execute() throws Exception {
-               streamExecEnvironment.execute();
+       private TableEnvironment getTableEnvironment() {
+               StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               return StreamTableEnvironment.create(streamExecEnvironment);
        }
 }

Reply via email to