This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit af4d880779738dd2a4817489b0e91bf68c670da9 Author: Timo Walther <[email protected]> AuthorDate: Tue Dec 17 10:46:13 2019 +0100 [hotfix][table] Update function tests to use testing infrastructure --- .../table/planner/functions/FunctionTestBase.java | 362 ----------------- .../planner/runtime/batch/sql/FunctionITCase.java | 39 -- .../planner/runtime/stream/sql/FunctionITCase.java | 390 ++++++++++++++++-- .../flink/table/functions/FunctionTestBase.java | 436 --------------------- .../table/runtime/batch/sql/FunctionITCase.java | 43 -- .../table/runtime/stream/sql/FunctionITCase.java | 431 +++++++++++++++++++- 6 files changed, 767 insertions(+), 934 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java deleted file mode 100644 index e788df9..0000000 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/FunctionTestBase.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.functions; - -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.types.Row; - -import org.junit.Test; - -import java.util.Arrays; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link CatalogFunction}. - */ -public abstract class FunctionTestBase { - protected static TableEnvironment tableEnv; - - @Test - public void testCreateCatalogFunctionInDefaultCatalog() { - String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'"; - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f1")); - - tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f1"); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f1")); - } - - @Test - public void testCreateFunctionWithFullPath() { - String ddl1 = "create function default_catalog.default_database.f2 as" + - " 'org.apache.flink.function.TestFunction'"; - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f2")); - - tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f2"); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f2")); - } - - @Test - public void testCreateFunctionWithoutCatalogIdentifier() { - String ddl1 = "create function default_database.f3 as" + - " 'org.apache.flink.function.TestFunction'"; - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f3")); - - tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f3"); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f3")); - } - - @Test - public void testCreateFunctionCatalogNotExists() { - - String ddl1 = "create function catalog1.database1.f3 as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(ddl1); - } catch (Exception e){ - assertTrue(e.getMessage().equals("Catalog catalog1 does not exist")); - } - } - - @Test - public void testCreateFunctionDBNotExists() { - String ddl1 = "create function default_catalog.database1.f3 as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(ddl1); - } catch (Exception e){ - assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION:" + - " (catalogFunction: [Optional[This is a user-defined function]], identifier:" + - " [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false])"); - } - } - - @Test - public void testCreateTemporaryCatalogFunction() { - String ddl1 = "create temporary function default_catalog.default_database.f4" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String ddl2 = "create temporary function if not exists default_catalog.default_database.f4" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String ddl3 = "drop temporary function default_catalog.default_database.f4"; - - String ddl4 = "drop temporary function if exists default_catalog.default_database.f4"; - - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4")); - - tableEnv.sqlUpdate(ddl2); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4")); - - tableEnv.sqlUpdate(ddl3); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f4")); - - tableEnv.sqlUpdate(ddl1); - try { - tableEnv.sqlUpdate(ddl1); - } catch (Exception e) { - assertTrue(e instanceof ValidationException); - assertEquals(e.getMessage(), "Temporary catalog function " + - "`default_catalog`.`default_database`.`f4` is already defined"); - } - - tableEnv.sqlUpdate(ddl3); - tableEnv.sqlUpdate(ddl4); - try { - tableEnv.sqlUpdate(ddl3); - } catch (Exception e) { - assertTrue(e instanceof ValidationException); - assertEquals(e.getMessage(), - "Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist"); - } - } - - @Test - public void testCreateTemporarySystemFunction() { - String ddl1 = "create temporary system function default_catalog.default_database.f5" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String ddl3 = "drop temporary system function default_catalog.default_database.f5"; - - tableEnv.sqlUpdate(ddl1); - tableEnv.sqlUpdate(ddl2); - tableEnv.sqlUpdate(ddl3); - } - - @Test - public void testAlterFunction() throws Exception { - String create = "create function f3 as 'org.apache.flink.function.TestFunction'"; - String alter = "alter function f3 as 'org.apache.flink.function.TestFunction2'"; - - ObjectPath objectPath = new ObjectPath("default_database", "f3"); - Catalog catalog = tableEnv.getCatalog("default_catalog").get(); - tableEnv.sqlUpdate(create); - CatalogFunction beforeUpdate = catalog.getFunction(objectPath); - assertEquals("org.apache.flink.function.TestFunction", beforeUpdate.getClassName()); - - tableEnv.sqlUpdate(alter); - CatalogFunction afterUpdate = catalog.getFunction(objectPath); - assertEquals("org.apache.flink.function.TestFunction2", afterUpdate.getClassName()); - } - - @Test - public void testAlterFunctionNonExists() { - String alterUndefinedFunction = "ALTER FUNCTION default_catalog.default_database.f4" + - " as 'org.apache.flink.function.TestFunction'"; - - String alterFunctionInWrongCatalog = "ALTER FUNCTION catalog1.default_database.f4 " + - "as 'org.apache.flink.function.TestFunction'"; - - String alterFunctionInWrongDB = "ALTER FUNCTION default_catalog.db1.f4 " + - "as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(alterUndefinedFunction); - fail(); - } catch (Exception e){ - assertEquals(e.getMessage(), - "Function default_database.f4 does not exist in Catalog default_catalog."); - } - - try { - tableEnv.sqlUpdate(alterFunctionInWrongCatalog); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Catalog catalog1 does not exist")); - } - - try { - tableEnv.sqlUpdate(alterFunctionInWrongDB); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), - "Function db1.f4 does not exist in Catalog default_catalog."); - } - } - - @Test - public void testAlterTemporaryCatalogFunction() { - String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4" + - " as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(alterTemporary); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Alter temporary catalog function is not supported")); - } - - } - - @Test - public void testAlterTemporarySystemFunction() { - String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4" + - " as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(alterTemporary); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Alter temporary system function is not supported")); - } - } - - @Test - public void testDropFunctionNonExists() { - String dropUndefinedFunction = "DROP FUNCTION default_catalog.default_database.f4"; - - String dropFunctionInWrongCatalog = "DROP FUNCTION catalog1.default_database.f4"; - - String dropFunctionInWrongDB = "DROP FUNCTION default_catalog.db1.f4"; - - try { - tableEnv.sqlUpdate(dropUndefinedFunction); - fail(); - } catch (Exception e){ - assertEquals(e.getMessage(), - "Function default_database.f4 does not exist in Catalog default_catalog."); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongCatalog); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Catalog catalog1 does not exist")); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongDB); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), "Function db1.f4 does not exist in Catalog default_catalog."); - } - } - - @Test - public void testDropTemporaryFunctionNonExits() { - String dropUndefinedFunction = "DROP TEMPORARY FUNCTION default_catalog.default_database.f4"; - String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION catalog1.default_database.f4"; - String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION default_catalog.db1.f4"; - - try { - tableEnv.sqlUpdate(dropUndefinedFunction); - fail(); - } catch (Exception e){ - assertEquals(e.getMessage(), - "Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist"); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongCatalog); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), - "Temporary catalog function `catalog1`.`default_database`.`f4` doesn't exist"); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongDB); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), "Temporary catalog function `default_catalog`.`db1`.`f4` doesn't exist"); - } - } - - @Test - public void testCreateAlterDropTemporaryCatalogFunctionsWithDifferentIdentifier() { - String createNoCatalogDB = "create temporary function f4" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String dropNoCatalogDB = "drop temporary function f4"; - - tableEnv.sqlUpdate(createNoCatalogDB); - tableEnv.sqlUpdate(dropNoCatalogDB); - - String createNonExistsCatalog = "create temporary function catalog1.default_database.f4" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4"; - - tableEnv.sqlUpdate(createNonExistsCatalog); - tableEnv.sqlUpdate(dropNonExistsCatalog); - - String createNonExistsDB = "create temporary function default_catalog.db1.f4" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String dropNonExistsDB = "drop temporary function default_catalog.db1.f4"; - - tableEnv.sqlUpdate(createNonExistsDB); - tableEnv.sqlUpdate(dropNonExistsDB); - } - - @Test - public void testDropTemporarySystemFunction() { - String ddl1 = "create temporary system function f5" + - " as 'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; - - String ddl2 = "drop temporary system function f5"; - - String ddl3 = "drop temporary system function if exists f5"; - - tableEnv.sqlUpdate(ddl1); - tableEnv.sqlUpdate(ddl2); - tableEnv.sqlUpdate(ddl3); - - try { - tableEnv.sqlUpdate(ddl2); - } catch (Exception e) { - assertEquals(e.getMessage(), "Temporary system function f5 doesn't exist"); - } - } - - protected Row toRow(Object ... objects) { - Row row = new Row(objects.length); - for (int i = 0; i < objects.length; i++) { - row.setField(i, objects[i]); - } - - return row; - } - - /** - * Test udf class. - */ - public static class TestUDF extends ScalarFunction { - - public Integer eval(Integer a, Integer b) { - return a + b; - } - } -} - diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java deleted file mode 100644 index a43c36a..0000000 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.batch.sql; - -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.planner.functions.FunctionTestBase; -import org.apache.flink.table.planner.utils.TestingTableEnvironment; - -import org.junit.BeforeClass; - -/** - * Tests for catalog and system functions in batch table environment. - */ -public class -FunctionITCase extends FunctionTestBase { - - @BeforeClass - public static void setup() { - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); - tableEnv = TestingTableEnvironment.create(environmentSettings); - } -} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java index 8aa8d68..310647a 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java @@ -18,14 +18,16 @@ package org.apache.flink.table.planner.runtime.stream.sql; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; -import org.apache.flink.table.planner.functions.FunctionTestBase; -import org.apache.flink.table.planner.utils.TestingTableEnvironment; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; import org.apache.flink.types.Row; -import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; @@ -33,82 +35,388 @@ import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for catalog and system in stream table environment. */ -public class FunctionITCase extends FunctionTestBase { +public class FunctionITCase extends StreamingTestBase { - @BeforeClass - public static void setup() { - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); - tableEnv = TestingTableEnvironment.create(environmentSettings); + private static final String TEST_FUNCTION = TestUDF.class.getName(); + + @Test + public void testCreateCatalogFunctionInDefaultCatalog() { + String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'"; + tEnv().sqlUpdate(ddl1); + assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f1")); + + tEnv().sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f1"); + assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f1")); + } + + @Test + public void testCreateFunctionWithFullPath() { + String ddl1 = "create function default_catalog.default_database.f2 as" + + " 'org.apache.flink.function.TestFunction'"; + tEnv().sqlUpdate(ddl1); + assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f2")); + + tEnv().sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f2"); + assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f2")); + } + + @Test + public void testCreateFunctionWithoutCatalogIdentifier() { + String ddl1 = "create function default_database.f3 as" + + " 'org.apache.flink.function.TestFunction'"; + tEnv().sqlUpdate(ddl1); + assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f3")); + + tEnv().sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f3"); + assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f3")); + } + + @Test + public void testCreateFunctionCatalogNotExists() { + String ddl1 = "create function catalog1.database1.f3 as 'org.apache.flink.function.TestFunction'"; + + try { + tEnv().sqlUpdate(ddl1); + } catch (Exception e){ + assertEquals("Catalog catalog1 does not exist", e.getMessage()); + } + } + + @Test + public void testCreateFunctionDBNotExists() { + String ddl1 = "create function default_catalog.database1.f3 as 'org.apache.flink.function.TestFunction'"; + + try { + tEnv().sqlUpdate(ddl1); + } catch (Exception e){ + assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION:" + + " (catalogFunction: [Optional[This is a user-defined function]], identifier:" + + " [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false])"); + } + } + + @Test + public void testCreateTemporaryCatalogFunction() { + String ddl1 = "create temporary function default_catalog.default_database.f4" + + " as '" + TEST_FUNCTION + "'"; + + String ddl2 = "create temporary function if not exists default_catalog.default_database.f4" + + " as '" + TEST_FUNCTION + "'"; + + String ddl3 = "drop temporary function default_catalog.default_database.f4"; + + String ddl4 = "drop temporary function if exists default_catalog.default_database.f4"; + + tEnv().sqlUpdate(ddl1); + assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f4")); + + tEnv().sqlUpdate(ddl2); + assertTrue(Arrays.asList(tEnv().listFunctions()).contains("f4")); + + tEnv().sqlUpdate(ddl3); + assertFalse(Arrays.asList(tEnv().listFunctions()).contains("f4")); + + tEnv().sqlUpdate(ddl1); + try { + tEnv().sqlUpdate(ddl1); + } catch (Exception e) { + assertTrue(e instanceof ValidationException); + assertEquals(e.getMessage(), + "Temporary catalog function `default_catalog`.`default_database`.`f4`" + + " is already defined"); + } + + tEnv().sqlUpdate(ddl3); + tEnv().sqlUpdate(ddl4); + try { + tEnv().sqlUpdate(ddl3); + } catch (Exception e) { + assertTrue(e instanceof ValidationException); + assertEquals(e.getMessage(), + "Temporary catalog function `default_catalog`.`default_database`.`f4`" + + " doesn't exist"); + } + } + + @Test + public void testCreateTemporarySystemFunction() { + String ddl1 = "create temporary system function default_catalog.default_database.f5" + + " as '" + TEST_FUNCTION + "'"; + + String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" + + " as '" + TEST_FUNCTION + "'"; + + String ddl3 = "drop temporary system function default_catalog.default_database.f5"; + + tEnv().sqlUpdate(ddl1); + tEnv().sqlUpdate(ddl2); + tEnv().sqlUpdate(ddl3); + } + + @Test + public void testAlterFunction() throws Exception { + String create = "create function f3 as 'org.apache.flink.function.TestFunction'"; + String alter = "alter function f3 as 'org.apache.flink.function.TestFunction2'"; + + ObjectPath objectPath = new ObjectPath("default_database", "f3"); + assertTrue(tEnv().getCatalog("default_catalog").isPresent()); + Catalog catalog = tEnv().getCatalog("default_catalog").get(); + tEnv().sqlUpdate(create); + CatalogFunction beforeUpdate = catalog.getFunction(objectPath); + assertEquals("org.apache.flink.function.TestFunction", beforeUpdate.getClassName()); + + tEnv().sqlUpdate(alter); + CatalogFunction afterUpdate = catalog.getFunction(objectPath); + assertEquals("org.apache.flink.function.TestFunction2", afterUpdate.getClassName()); + } + + @Test + public void testAlterFunctionNonExists() { + String alterUndefinedFunction = "ALTER FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + String alterFunctionInWrongCatalog = "ALTER FUNCTION catalog1.default_database.f4 " + + "as 'org.apache.flink.function.TestFunction'"; + + String alterFunctionInWrongDB = "ALTER FUNCTION default_catalog.db1.f4 " + + "as 'org.apache.flink.function.TestFunction'"; + + try { + tEnv().sqlUpdate(alterUndefinedFunction); + fail(); + } catch (Exception e){ + assertEquals(e.getMessage(), + "Function default_database.f4 does not exist in Catalog default_catalog."); + } + + try { + tEnv().sqlUpdate(alterFunctionInWrongCatalog); + fail(); + } catch (Exception e) { + assertEquals("Catalog catalog1 does not exist", e.getMessage()); + } + + try { + tEnv().sqlUpdate(alterFunctionInWrongDB); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), "Function db1.f4 does not exist" + + " in Catalog default_catalog."); + } + } + + @Test + public void testAlterTemporaryCatalogFunction() { + String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + try { + tEnv().sqlUpdate(alterTemporary); + fail(); + } catch (Exception e) { + assertEquals("Alter temporary catalog function is not supported", e.getMessage()); + } + } + + @Test + public void testAlterTemporarySystemFunction() { + String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + try { + tEnv().sqlUpdate(alterTemporary); + fail(); + } catch (Exception e) { + assertEquals("Alter temporary system function is not supported", e.getMessage()); + } + } + + @Test + public void testDropFunctionNonExists() { + String dropUndefinedFunction = "DROP FUNCTION default_catalog.default_database.f4"; + + String dropFunctionInWrongCatalog = "DROP FUNCTION catalog1.default_database.f4"; + + String dropFunctionInWrongDB = "DROP FUNCTION default_catalog.db1.f4"; + + try { + tEnv().sqlUpdate(dropUndefinedFunction); + fail(); + } catch (Exception e){ + assertEquals(e.getMessage(), + "Function default_database.f4 does not exist in Catalog default_catalog."); + } + + try { + tEnv().sqlUpdate(dropFunctionInWrongCatalog); + fail(); + } catch (Exception e) { + assertEquals("Catalog catalog1 does not exist", e.getMessage()); + } + + try { + tEnv().sqlUpdate(dropFunctionInWrongDB); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), + "Function db1.f4 does not exist in Catalog default_catalog."); + } + } + + @Test + public void testDropTemporaryFunctionNonExits() { + String dropUndefinedFunction = "DROP TEMPORARY FUNCTION default_catalog.default_database.f4"; + String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION catalog1.default_database.f4"; + String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION default_catalog.db1.f4"; + + try { + tEnv().sqlUpdate(dropUndefinedFunction); + fail(); + } catch (Exception e){ + assertEquals(e.getMessage(), "Temporary catalog function" + + " `default_catalog`.`default_database`.`f4` doesn't exist"); + } + + try { + tEnv().sqlUpdate(dropFunctionInWrongCatalog); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary catalog function " + + "`catalog1`.`default_database`.`f4` doesn't exist"); + } + + try { + tEnv().sqlUpdate(dropFunctionInWrongDB); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary catalog function " + + "`default_catalog`.`db1`.`f4` doesn't exist"); + } } @Test - public void testUseDefinedRegularCatalogFunction() throws Exception { - String functionDDL = "create function addOne as " + - "'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { + String createNoCatalogDB = "create temporary function f4" + + " as '" + TEST_FUNCTION + "'"; + + String dropNoCatalogDB = "drop temporary function f4"; + + tEnv().sqlUpdate(createNoCatalogDB); + tEnv().sqlUpdate(dropNoCatalogDB); + + String createNonExistsCatalog = "create temporary function catalog1.default_database.f4" + + " as '" + TEST_FUNCTION + "'"; + + String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4"; + + tEnv().sqlUpdate(createNonExistsCatalog); + tEnv().sqlUpdate(dropNonExistsCatalog); + + String createNonExistsDB = "create temporary function default_catalog.db1.f4" + + " as '" + TEST_FUNCTION + "'"; + + String dropNonExistsDB = "drop temporary function default_catalog.db1.f4"; + + tEnv().sqlUpdate(createNonExistsDB); + tEnv().sqlUpdate(dropNonExistsDB); + } + + @Test + public void testDropTemporarySystemFunction() { + String ddl1 = "create temporary system function f5 as '" + TEST_FUNCTION + "'"; + + String ddl2 = "drop temporary system function f5"; + + String ddl3 = "drop temporary system function if exists f5"; + + tEnv().sqlUpdate(ddl1); + tEnv().sqlUpdate(ddl2); + tEnv().sqlUpdate(ddl3); + + try { + tEnv().sqlUpdate(ddl2); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary system function f5 doesn't exist"); + } + } + + @Test + public void testUserDefinedRegularCatalogFunction() throws Exception { + String functionDDL = "create function addOne as '" + TEST_FUNCTION + "'"; String dropFunctionDDL = "drop function addOne"; - testUseDefinedCatalogFunction(functionDDL); + testUserDefinedCatalogFunction(functionDDL); // delete the function - tableEnv.sqlUpdate(dropFunctionDDL); + tEnv().sqlUpdate(dropFunctionDDL); } @Test - public void testUseDefinedTemporaryCatalogFunction() throws Exception { - String functionDDL = "create temporary function addOne as " + - "'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + public void testUserDefinedTemporaryCatalogFunction() throws Exception { + String functionDDL = "create temporary function addOne as '" + TEST_FUNCTION + "'"; String dropFunctionDDL = "drop temporary function addOne"; - testUseDefinedCatalogFunction(functionDDL); + testUserDefinedCatalogFunction(functionDDL); // delete the function - tableEnv.sqlUpdate(dropFunctionDDL); + tEnv().sqlUpdate(dropFunctionDDL); } @Test - public void testUseDefinedTemporarySystemFunction() throws Exception { - String functionDDL = "create temporary system function addOne as " + - "'org.apache.flink.table.planner.functions.FunctionTestBase$TestUDF'"; + public void testUserDefinedTemporarySystemFunction() throws Exception { + String functionDDL = "create temporary system function addOne as '" + TEST_FUNCTION + "'"; String dropFunctionDDL = "drop temporary system function addOne"; - testUseDefinedCatalogFunction(functionDDL); + testUserDefinedCatalogFunction(functionDDL); // delete the function - tableEnv.sqlUpdate(dropFunctionDDL); + tEnv().sqlUpdate(dropFunctionDDL); + } + + /** + * Test udf class. + */ + public static class TestUDF extends ScalarFunction { + + public Integer eval(Integer a, Integer b) { + return a + b; + } } - // This test case only works for stream mode - private void testUseDefinedCatalogFunction(String createFunctionDDL) throws Exception { + private void testUserDefinedCatalogFunction(String createFunctionDDL) throws Exception { List<Row> sourceData = Arrays.asList( - toRow(1, "1000", 2), - toRow(2, "1", 3), - toRow(3, "2000", 4), - toRow(1, "2", 2), - toRow(2, "3000", 3) + Row.of(1, "1000", 2), + Row.of(2, "1", 3), + Row.of(3, "2000", 4), + Row.of(1, "2", 2), + Row.of(2, "3000", 3) ); TestCollectionTableFactory.reset(); - TestCollectionTableFactory.initData(sourceData, new ArrayList<Row>(), -1); + TestCollectionTableFactory.initData(sourceData, new ArrayList<>(), -1); String sourceDDL = "create table t1(a int, b varchar, c int) with ('connector' = 'COLLECTION')"; String sinkDDL = "create table t2(a int, b varchar, c int) with ('connector' = 'COLLECTION')"; String query = "select t1.a, t1.b, addOne(t1.a, 1) as c from t1"; - tableEnv.sqlUpdate(sourceDDL); - tableEnv.sqlUpdate(sinkDDL); - tableEnv.sqlUpdate(createFunctionDDL); - Table t2 = tableEnv.sqlQuery(query); - tableEnv.insertInto("t2", t2); - tableEnv.execute("job1"); + tEnv().sqlUpdate(sourceDDL); + tEnv().sqlUpdate(sinkDDL); + tEnv().sqlUpdate(createFunctionDDL); + Table t2 = tEnv().sqlQuery(query); + tEnv().insertInto("t2", t2); + tEnv().execute("job1"); Row[] result = TestCollectionTableFactory.RESULT().toArray(new Row[0]); Row[] expected = sourceData.toArray(new Row[0]); assertArrayEquals(expected, result); - tableEnv.sqlUpdate("drop table t1"); - tableEnv.sqlUpdate("drop table t2"); + tEnv().sqlUpdate("drop table t1"); + tEnv().sqlUpdate("drop table t2"); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java deleted file mode 100644 index 5815da6..0000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/functions/FunctionTestBase.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.functions; - -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.factories.utils.TestCollectionTableFactory; -import org.apache.flink.types.Row; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for both catalog and system function. - */ -public abstract class FunctionTestBase { - private static TableEnvironment tableEnv; - - public static void setTableEnv(TableEnvironment e) { - tableEnv = e; - } - - public abstract void execute() throws Exception; - - @Test - public void testCreateCatalogFunctionInDefaultCatalog() { - String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'"; - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f1")); - - tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f1"); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f1")); - } - - @Test - public void testCreateFunctionWithFullPath() { - String ddl1 = "create function default_catalog.default_database.f2 as" + - " 'org.apache.flink.function.TestFunction'"; - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f2")); - - tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f2"); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f2")); - } - - @Test - public void testCreateFunctionWithoutCatalogIdentifier() { - String ddl1 = "create function default_database.f3 as" + - " 'org.apache.flink.function.TestFunction'"; - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f3")); - - tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f3"); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f3")); - } - - @Test - public void testCreateFunctionCatalogNotExists() { - - String ddl1 = "create function catalog1.database1.f3 as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(ddl1); - } catch (Exception e){ - assertTrue(e.getMessage().equals("Catalog catalog1 does not exist")); - } - } - - @Test - public void testCreateFunctionDBNotExists() { - String ddl1 = "create function default_catalog.database1.f3 as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(ddl1); - } catch (Exception e){ - assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION:" + - " (catalogFunction: [Optional[This is a user-defined function]], identifier:" + - " [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false])"); - } - } - - @Test - public void testCreateTemporaryCatalogFunction() { - String ddl1 = "create temporary function default_catalog.default_database.f4" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String ddl2 = "create temporary function if not exists default_catalog.default_database.f4" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String ddl3 = "drop temporary function default_catalog.default_database.f4"; - - String ddl4 = "drop temporary function if exists default_catalog.default_database.f4"; - - tableEnv.sqlUpdate(ddl1); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4")); - - tableEnv.sqlUpdate(ddl2); - assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4")); - - tableEnv.sqlUpdate(ddl3); - assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f4")); - - tableEnv.sqlUpdate(ddl1); - try { - tableEnv.sqlUpdate(ddl1); - } catch (Exception e) { - assertTrue(e instanceof ValidationException); - assertEquals(e.getMessage(), - "Temporary catalog function `default_catalog`.`default_database`.`f4`" + - " is already defined"); - } - - tableEnv.sqlUpdate(ddl3); - tableEnv.sqlUpdate(ddl4); - try { - tableEnv.sqlUpdate(ddl3); - } catch (Exception e) { - assertTrue(e instanceof ValidationException); - assertEquals(e.getMessage(), - "Temporary catalog function `default_catalog`.`default_database`.`f4`" + - " doesn't exist"); - } - } - - @Test - public void testCreateTemporarySystemFunction() { - String ddl1 = "create temporary system function default_catalog.default_database.f5" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" + - " as 'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'"; - - String ddl3 = "drop temporary system function default_catalog.default_database.f5"; - - tableEnv.sqlUpdate(ddl1); - tableEnv.sqlUpdate(ddl2); - tableEnv.sqlUpdate(ddl3); - } - - @Test - public void testAlterFunction() throws Exception { - String create = "create function f3 as 'org.apache.flink.function.TestFunction'"; - String alter = "alter function f3 as 'org.apache.flink.function.TestFunction2'"; - - ObjectPath objectPath = new ObjectPath("default_database", "f3"); - Catalog catalog = tableEnv.getCatalog("default_catalog").get(); - tableEnv.sqlUpdate(create); - CatalogFunction beforeUpdate = catalog.getFunction(objectPath); - assertEquals("org.apache.flink.function.TestFunction", beforeUpdate.getClassName()); - - tableEnv.sqlUpdate(alter); - CatalogFunction afterUpdate = catalog.getFunction(objectPath); - assertEquals("org.apache.flink.function.TestFunction2", afterUpdate.getClassName()); - } - - @Test - public void testAlterFunctionNonExists() { - String alterUndefinedFunction = "ALTER FUNCTION default_catalog.default_database.f4" + - " as 'org.apache.flink.function.TestFunction'"; - - String alterFunctionInWrongCatalog = "ALTER FUNCTION catalog1.default_database.f4 " + - "as 'org.apache.flink.function.TestFunction'"; - - String alterFunctionInWrongDB = "ALTER FUNCTION default_catalog.db1.f4 " + - "as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(alterUndefinedFunction); - fail(); - } catch (Exception e){ - assertEquals(e.getMessage(), - "Function default_database.f4 does not exist in Catalog default_catalog."); - } - - try { - tableEnv.sqlUpdate(alterFunctionInWrongCatalog); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Catalog catalog1 does not exist")); - } - - try { - tableEnv.sqlUpdate(alterFunctionInWrongDB); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), "Function db1.f4 does not exist" + - " in Catalog default_catalog."); - } - } - - @Test - public void testAlterTemporaryCatalogFunction() { - String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4" + - " as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(alterTemporary); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Alter temporary catalog function is not supported")); - } - } - - @Test - public void testAlterTemporarySystemFunction() { - String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4" + - " as 'org.apache.flink.function.TestFunction'"; - - try { - tableEnv.sqlUpdate(alterTemporary); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Alter temporary system function is not supported")); - } - } - - @Test - public void testDropFunctionNonExists() { - String dropUndefinedFunction = "DROP FUNCTION default_catalog.default_database.f4"; - - String dropFunctionInWrongCatalog = "DROP FUNCTION catalog1.default_database.f4"; - - String dropFunctionInWrongDB = "DROP FUNCTION default_catalog.db1.f4"; - - try { - tableEnv.sqlUpdate(dropUndefinedFunction); - fail(); - } catch (Exception e){ - assertEquals(e.getMessage(), - "Function default_database.f4 does not exist in Catalog default_catalog."); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongCatalog); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().equals("Catalog catalog1 does not exist")); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongDB); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), - "Function db1.f4 does not exist in Catalog default_catalog."); - } - } - - @Test - public void testDropTemporaryFunctionNonExits() { - String dropUndefinedFunction = "DROP TEMPORARY FUNCTION default_catalog.default_database.f4"; - String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION catalog1.default_database.f4"; - String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION default_catalog.db1.f4"; - - try { - tableEnv.sqlUpdate(dropUndefinedFunction); - fail(); - } catch (Exception e){ - assertEquals(e.getMessage(), "Temporary catalog function" + - " `default_catalog`.`default_database`.`f4` doesn't exist"); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongCatalog); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), "Temporary catalog function " + - "`catalog1`.`default_database`.`f4` doesn't exist"); - } - - try { - tableEnv.sqlUpdate(dropFunctionInWrongDB); - fail(); - } catch (Exception e) { - assertEquals(e.getMessage(), "Temporary catalog function " + - "`default_catalog`.`db1`.`f4` doesn't exist"); - } - } - - @Test - public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { - String createNoCatalogDB = "create temporary function f4" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String dropNoCatalogDB = "drop temporary function f4"; - - tableEnv.sqlUpdate(createNoCatalogDB); - tableEnv.sqlUpdate(dropNoCatalogDB); - - String createNonExistsCatalog = "create temporary function catalog1.default_database.f4" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4"; - - tableEnv.sqlUpdate(createNonExistsCatalog); - tableEnv.sqlUpdate(dropNonExistsCatalog); - - String createNonExistsDB = "create temporary function default_catalog.db1.f4" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String dropNonExistsDB = "drop temporary function default_catalog.db1.f4"; - - tableEnv.sqlUpdate(createNonExistsDB); - tableEnv.sqlUpdate(dropNonExistsDB); - } - - @Test - public void testDropTemporarySystemFunction() { - String ddl1 = "create temporary system function f5" + - " as 'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String ddl2 = "drop temporary system function f5"; - - String ddl3 = "drop temporary system function if exists f5"; - - tableEnv.sqlUpdate(ddl1); - tableEnv.sqlUpdate(ddl2); - tableEnv.sqlUpdate(ddl3); - - try { - tableEnv.sqlUpdate(ddl2); - } catch (Exception e) { - assertEquals(e.getMessage(), "Temporary system function f5 doesn't exist"); - } - } - - @Test - public void testUseDefinedRegularCatalogFunction() throws Exception { - String functionDDL = "create function addOne as " + - "'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String dropFunctionDDL = "drop function addOne"; - testUseDefinedCatalogFunction(functionDDL); - // delete the function - tableEnv.sqlUpdate(dropFunctionDDL); - } - - @Test - public void testUseDefinedTemporaryCatalogFunction() throws Exception { - String functionDDL = "create temporary function addOne as " + - "'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String dropFunctionDDL = "drop temporary function addOne"; - testUseDefinedCatalogFunction(functionDDL); - // delete the function - tableEnv.sqlUpdate(dropFunctionDDL); - } - - @Test - public void testUseDefinedTemporarySystemFunction() throws Exception { - String functionDDL = "create temporary system function addOne as " + - "'org.apache.flink.table.functions.FunctionTestBase$TestUDF'"; - - String dropFunctionDDL = "drop temporary system function addOne"; - testUseDefinedCatalogFunction(functionDDL); - // delete the function - tableEnv.sqlUpdate(dropFunctionDDL); - } - - private void testUseDefinedCatalogFunction(String createFunctionDDL) throws Exception { - List<Row> sourceData = Arrays.asList( - toRow(1, "1000", 2), - toRow(2, "1", 3), - toRow(3, "2000", 4), - toRow(1, "2", 2), - toRow(2, "3000", 3) - ); - - TestCollectionTableFactory.reset(); - TestCollectionTableFactory.initData(sourceData, new ArrayList<Row>(), -1); - - String sourceDDL = "create table t1(a int, b varchar, c int) with ('connector' = 'COLLECTION')"; - String sinkDDL = "create table t2(a int, b varchar, c int) with ('connector' = 'COLLECTION')"; - String query = " insert into t2 select t1.a, t1.b, addOne(t1.a, 1) as c from t1"; - - tableEnv.sqlUpdate(sourceDDL); - tableEnv.sqlUpdate(sinkDDL); - tableEnv.sqlUpdate(createFunctionDDL); - tableEnv.sqlUpdate(query); - execute(); - - Row[] result = TestCollectionTableFactory.RESULT().toArray(new Row[0]); - Row[] expected = sourceData.toArray(new Row[0]); - assertArrayEquals(expected, result); - - tableEnv.sqlUpdate("drop table t1"); - tableEnv.sqlUpdate("drop table t2"); - } - - private Row toRow(Object ... objects) { - Row row = new Row(objects.length); - for (int i = 0; i < objects.length; i++) { - row.setField(i, objects[i]); - } - - return row; - } - - /** - * Test udf class. - */ - public static class TestUDF extends ScalarFunction { - - public Integer eval(Integer a, Integer b) { - return a + b; - } - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java deleted file mode 100644 index 7e2a10f..0000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/FunctionITCase.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.batch.sql; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.functions.FunctionTestBase; - -import org.junit.BeforeClass; - -/** - * Tests for catalog and system function in batch table environment. - */ -public class FunctionITCase extends FunctionTestBase { - private static ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); - - @BeforeClass - public static void setup() { - BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment); - setTableEnv(batchTableEnvironment); - } - - @Override - public void execute() throws Exception { - executionEnvironment.execute(); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java index 045c4b0..6ccfc02 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java @@ -19,26 +19,431 @@ package org.apache.flink.table.runtime.stream.sql; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.functions.FunctionTestBase; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; -import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** - * Tests for catalog and system function in stream table environment. + * Tests for both catalog and system function. */ -public class FunctionITCase extends FunctionTestBase { - private static StreamExecutionEnvironment streamExecEnvironment; +public class FunctionITCase extends AbstractTestBase { + + private static final String TEST_FUNCTION = TestUDF.class.getName(); + + @Test + public void testCreateCatalogFunctionInDefaultCatalog() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'"; + tableEnv.sqlUpdate(ddl1); + assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f1")); + + tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f1"); + assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f1")); + } + + @Test + public void testCreateFunctionWithFullPath() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create function default_catalog.default_database.f2 as" + + " 'org.apache.flink.function.TestFunction'"; + tableEnv.sqlUpdate(ddl1); + assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f2")); + + tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f2"); + assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f2")); + } + + @Test + public void testCreateFunctionWithoutCatalogIdentifier() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create function default_database.f3 as" + + " 'org.apache.flink.function.TestFunction'"; + tableEnv.sqlUpdate(ddl1); + assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f3")); + + tableEnv.sqlUpdate("DROP FUNCTION IF EXISTS default_catalog.default_database.f3"); + assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f3")); + } + + @Test + public void testCreateFunctionCatalogNotExists() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create function catalog1.database1.f3 as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(ddl1); + } catch (Exception e){ + assertEquals("Catalog catalog1 does not exist", e.getMessage()); + } + } + + @Test + public void testCreateFunctionDBNotExists() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create function default_catalog.database1.f3 as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(ddl1); + } catch (Exception e){ + assertEquals(e.getMessage(), "Could not execute CREATE CATALOG FUNCTION:" + + " (catalogFunction: [Optional[This is a user-defined function]], identifier:" + + " [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false])"); + } + } + + @Test + public void testCreateTemporaryCatalogFunction() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create temporary function default_catalog.default_database.f4" + + " as '" + TEST_FUNCTION + "'"; + + String ddl2 = "create temporary function if not exists default_catalog.default_database.f4" + + " as '" + TEST_FUNCTION + "'"; + + String ddl3 = "drop temporary function default_catalog.default_database.f4"; + + String ddl4 = "drop temporary function if exists default_catalog.default_database.f4"; + + tableEnv.sqlUpdate(ddl1); + assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4")); + + tableEnv.sqlUpdate(ddl2); + assertTrue(Arrays.asList(tableEnv.listFunctions()).contains("f4")); + + tableEnv.sqlUpdate(ddl3); + assertFalse(Arrays.asList(tableEnv.listFunctions()).contains("f4")); + + tableEnv.sqlUpdate(ddl1); + try { + tableEnv.sqlUpdate(ddl1); + } catch (Exception e) { + assertTrue(e instanceof ValidationException); + assertEquals(e.getMessage(), + "Temporary catalog function `default_catalog`.`default_database`.`f4`" + + " is already defined"); + } + + tableEnv.sqlUpdate(ddl3); + tableEnv.sqlUpdate(ddl4); + try { + tableEnv.sqlUpdate(ddl3); + } catch (Exception e) { + assertTrue(e instanceof ValidationException); + assertEquals(e.getMessage(), + "Temporary catalog function `default_catalog`.`default_database`.`f4`" + + " doesn't exist"); + } + } + + @Test + public void testCreateTemporarySystemFunction() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create temporary system function default_catalog.default_database.f5" + + " as '" + TEST_FUNCTION + "'"; + + String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" + + " as 'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'"; + + String ddl3 = "drop temporary system function default_catalog.default_database.f5"; + + tableEnv.sqlUpdate(ddl1); + tableEnv.sqlUpdate(ddl2); + tableEnv.sqlUpdate(ddl3); + } + + @Test + public void testAlterFunction() throws Exception { + TableEnvironment tableEnv = getTableEnvironment(); + String create = "create function f3 as 'org.apache.flink.function.TestFunction'"; + String alter = "alter function f3 as 'org.apache.flink.function.TestFunction2'"; + + ObjectPath objectPath = new ObjectPath("default_database", "f3"); + assertTrue(tableEnv.getCatalog("default_catalog").isPresent()); + Catalog catalog = tableEnv.getCatalog("default_catalog").get(); + tableEnv.sqlUpdate(create); + CatalogFunction beforeUpdate = catalog.getFunction(objectPath); + assertEquals("org.apache.flink.function.TestFunction", beforeUpdate.getClassName()); + + tableEnv.sqlUpdate(alter); + CatalogFunction afterUpdate = catalog.getFunction(objectPath); + assertEquals("org.apache.flink.function.TestFunction2", afterUpdate.getClassName()); + } + + @Test + public void testAlterFunctionNonExists() { + TableEnvironment tableEnv = getTableEnvironment(); + String alterUndefinedFunction = "ALTER FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + String alterFunctionInWrongCatalog = "ALTER FUNCTION catalog1.default_database.f4 " + + "as 'org.apache.flink.function.TestFunction'"; + + String alterFunctionInWrongDB = "ALTER FUNCTION default_catalog.db1.f4 " + + "as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(alterUndefinedFunction); + fail(); + } catch (Exception e){ + assertEquals(e.getMessage(), + "Function default_database.f4 does not exist in Catalog default_catalog."); + } + + try { + tableEnv.sqlUpdate(alterFunctionInWrongCatalog); + fail(); + } catch (Exception e) { + assertEquals("Catalog catalog1 does not exist", e.getMessage()); + } + + try { + tableEnv.sqlUpdate(alterFunctionInWrongDB); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), "Function db1.f4 does not exist" + + " in Catalog default_catalog."); + } + } + + @Test + public void testAlterTemporaryCatalogFunction() { + TableEnvironment tableEnv = getTableEnvironment(); + String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(alterTemporary); + fail(); + } catch (Exception e) { + assertEquals("Alter temporary catalog function is not supported", e.getMessage()); + } + } + + @Test + public void testAlterTemporarySystemFunction() { + TableEnvironment tableEnv = getTableEnvironment(); + String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4" + + " as 'org.apache.flink.function.TestFunction'"; + + try { + tableEnv.sqlUpdate(alterTemporary); + fail(); + } catch (Exception e) { + assertEquals("Alter temporary system function is not supported", e.getMessage()); + } + } + + @Test + public void testDropFunctionNonExists() { + TableEnvironment tableEnv = getTableEnvironment(); + String dropUndefinedFunction = "DROP FUNCTION default_catalog.default_database.f4"; + + String dropFunctionInWrongCatalog = "DROP FUNCTION catalog1.default_database.f4"; + + String dropFunctionInWrongDB = "DROP FUNCTION default_catalog.db1.f4"; + + try { + tableEnv.sqlUpdate(dropUndefinedFunction); + fail(); + } catch (Exception e){ + assertEquals(e.getMessage(), + "Function default_database.f4 does not exist in Catalog default_catalog."); + } + + try { + tableEnv.sqlUpdate(dropFunctionInWrongCatalog); + fail(); + } catch (Exception e) { + assertEquals("Catalog catalog1 does not exist", e.getMessage()); + } + + try { + tableEnv.sqlUpdate(dropFunctionInWrongDB); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), + "Function db1.f4 does not exist in Catalog default_catalog."); + } + } + + @Test + public void testDropTemporaryFunctionNonExits() { + TableEnvironment tableEnv = getTableEnvironment(); + String dropUndefinedFunction = "DROP TEMPORARY FUNCTION default_catalog.default_database.f4"; + String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION catalog1.default_database.f4"; + String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION default_catalog.db1.f4"; + + try { + tableEnv.sqlUpdate(dropUndefinedFunction); + fail(); + } catch (Exception e){ + assertEquals(e.getMessage(), "Temporary catalog function" + + " `default_catalog`.`default_database`.`f4` doesn't exist"); + } + + try { + tableEnv.sqlUpdate(dropFunctionInWrongCatalog); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary catalog function " + + "`catalog1`.`default_database`.`f4` doesn't exist"); + } + + try { + tableEnv.sqlUpdate(dropFunctionInWrongDB); + fail(); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary catalog function " + + "`default_catalog`.`db1`.`f4` doesn't exist"); + } + } + + @Test + public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() { + TableEnvironment tableEnv = getTableEnvironment(); + String createNoCatalogDB = "create temporary function f4" + + " as '" + TEST_FUNCTION + "'"; + + String dropNoCatalogDB = "drop temporary function f4"; + + tableEnv.sqlUpdate(createNoCatalogDB); + tableEnv.sqlUpdate(dropNoCatalogDB); + + String createNonExistsCatalog = "create temporary function catalog1.default_database.f4" + + " as '" + TEST_FUNCTION + "'"; + + String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4"; + + tableEnv.sqlUpdate(createNonExistsCatalog); + tableEnv.sqlUpdate(dropNonExistsCatalog); + + String createNonExistsDB = "create temporary function default_catalog.db1.f4" + + " as '" + TEST_FUNCTION + "'"; + + String dropNonExistsDB = "drop temporary function default_catalog.db1.f4"; + + tableEnv.sqlUpdate(createNonExistsDB); + tableEnv.sqlUpdate(dropNonExistsDB); + } + + @Test + public void testDropTemporarySystemFunction() { + TableEnvironment tableEnv = getTableEnvironment(); + String ddl1 = "create temporary system function f5" + + " as '" + TEST_FUNCTION + "'"; + + String ddl2 = "drop temporary system function f5"; + + String ddl3 = "drop temporary system function if exists f5"; + + tableEnv.sqlUpdate(ddl1); + tableEnv.sqlUpdate(ddl2); + tableEnv.sqlUpdate(ddl3); + + try { + tableEnv.sqlUpdate(ddl2); + } catch (Exception e) { + assertEquals(e.getMessage(), "Temporary system function f5 doesn't exist"); + } + } + + @Test + public void testUserDefinedRegularCatalogFunction() throws Exception { + TableEnvironment tableEnv = getTableEnvironment(); + String functionDDL = "create function addOne as " + + "'" + TEST_FUNCTION + "'"; + + String dropFunctionDDL = "drop function addOne"; + testUserDefinedCatalogFunction(tableEnv, functionDDL); + // delete the function + tableEnv.sqlUpdate(dropFunctionDDL); + } + + @Test + public void testUserDefinedTemporaryCatalogFunction() throws Exception { + TableEnvironment tableEnv = getTableEnvironment(); + String functionDDL = "create temporary function addOne as " + + "'" + TEST_FUNCTION + "'"; + + String dropFunctionDDL = "drop temporary function addOne"; + testUserDefinedCatalogFunction(tableEnv, functionDDL); + // delete the function + tableEnv.sqlUpdate(dropFunctionDDL); + } + + @Test + public void testUserDefinedTemporarySystemFunction() throws Exception { + TableEnvironment tableEnv = getTableEnvironment(); + String functionDDL = "create temporary system function addOne as " + + "'" + TEST_FUNCTION + "'"; + + String dropFunctionDDL = "drop temporary system function addOne"; + testUserDefinedCatalogFunction(tableEnv, functionDDL); + // delete the function + tableEnv.sqlUpdate(dropFunctionDDL); + } + + /** + * Test udf class. + */ + public static class TestUDF extends ScalarFunction { + + public Integer eval(Integer a, Integer b) { + return a + b; + } + } + + private void testUserDefinedCatalogFunction(TableEnvironment tableEnv, String createFunctionDDL) throws Exception { + List<Row> sourceData = Arrays.asList( + Row.of(1, "1000", 2), + Row.of(2, "1", 3), + Row.of(3, "2000", 4), + Row.of(1, "2", 2), + Row.of(2, "3000", 3) + ); + + TestCollectionTableFactory.reset(); + TestCollectionTableFactory.initData(sourceData, new ArrayList<>(), -1); + + String sourceDDL = "create table t1(a int, b varchar, c int) with ('connector' = 'COLLECTION')"; + String sinkDDL = "create table t2(a int, b varchar, c int) with ('connector' = 'COLLECTION')"; + String query = " insert into t2 select t1.a, t1.b, addOne(t1.a, 1) as c from t1"; + + tableEnv.sqlUpdate(sourceDDL); + tableEnv.sqlUpdate(sinkDDL); + tableEnv.sqlUpdate(createFunctionDDL); + tableEnv.sqlUpdate(query); + tableEnv.execute("Test job"); + + Row[] result = TestCollectionTableFactory.RESULT().toArray(new Row[0]); + Row[] expected = sourceData.toArray(new Row[0]); + assertArrayEquals(expected, result); - @BeforeClass - public static void setup() { - streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); - setTableEnv(streamTableEnvironment); + tableEnv.sqlUpdate("drop table t1"); + tableEnv.sqlUpdate("drop table t2"); } - @Override - public void execute() throws Exception { - streamExecEnvironment.execute(); + private TableEnvironment getTableEnvironment() { + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + return StreamTableEnvironment.create(streamExecEnvironment); } }
