[FLINK-8863] [sql-client] Make user-defined functions more robust - Simplify code and fix various bugs - Add more tests - Refactor various names for descriptors and variables - Make 'from' property mandatory - Make LiteralValue public API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abfdc1a2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abfdc1a2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abfdc1a2 Branch: refs/heads/master Commit: abfdc1a2d649029d9f8a22c8d4880dc5068149c8 Parents: 8014dad Author: Timo Walther <twal...@apache.org> Authored: Mon Jul 9 23:07:38 2018 +0200 Committer: Timo Walther <twal...@apache.org> Committed: Tue Jul 10 13:55:21 2018 +0200 ---------------------------------------------------------------------- .../conf/sql-client-defaults.yaml | 18 +- .../flink/table/client/cli/CliClient.java | 19 ++ .../flink/table/client/cli/CliStrings.java | 1 + .../table/client/cli/SqlCommandParser.java | 1 + .../flink/table/client/config/Environment.java | 36 ++- .../flink/table/client/config/Source.java | 5 +- .../client/config/UserDefinedFunction.java | 63 ++--- .../flink/table/client/gateway/Executor.java | 5 + .../client/gateway/local/ExecutionContext.java | 23 +- .../client/gateway/local/LocalExecutor.java | 8 + .../gateway/local/ExecutionContextTest.java | 14 ++ .../gateway/local/LocalExecutorITCase.java | 130 +++------- .../gateway/utils/UserDefinedFunctions.java | 16 +- .../resources/test-sql-client-defaults.yaml | 25 ++ .../src/test/resources/test-sql-client-udf.yaml | 82 ------- .../flink/table/descriptors/ClassInstance.scala | 238 +++++++++++++++++++ .../descriptors/ClassInstanceValidator.scala | 59 +++++ .../flink/table/descriptors/ClassType.scala | 123 ---------- .../table/descriptors/ClassTypeValidator.scala | 75 ------ .../flink/table/descriptors/Descriptor.scala | 8 +- .../descriptors/DescriptorProperties.scala | 114 +++++---- .../table/descriptors/FunctionDescriptor.scala | 38 +-- .../FunctionDescriptorValidator.scala | 57 +++++ .../table/descriptors/FunctionValidator.scala | 42 ---- .../table/descriptors/HierarchyDescriptor.scala | 7 +- .../HierarchyDescriptorValidator.scala | 14 +- .../flink/table/descriptors/LiteralValue.scala | 221 +++++++++++++++++ .../descriptors/LiteralValueValidator.scala | 144 +++++++++++ .../flink/table/descriptors/PrimitiveType.scala | 55 ----- .../descriptors/PrimitiveTypeValidator.scala | 146 ------------ .../descriptors/service/FunctionService.scala | 87 ------- .../flink/table/functions/FunctionService.scala | 156 ++++++++++++ .../table/descriptors/ClassInstanceTest.scala | 102 ++++++++ .../flink/table/descriptors/ClassTypeTest.scala | 77 ------ .../flink/table/descriptors/CsvTest.scala | 3 + .../descriptors/FunctionDescriptorTest.scala | 59 +++++ .../flink/table/descriptors/FunctionTest.scala | 65 ----- .../table/descriptors/LiteralValueTest.scala | 125 ++++++++++ .../table/descriptors/PrimitiveTypeTest.scala | 117 --------- .../table/functions/FunctionServiceTest.scala | 125 ++++++++++ 40 files changed, 1576 insertions(+), 1127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml index 7ca7776..51e6e95 100644 --- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml @@ -21,11 +21,14 @@ # Defaults might be overwritten by a session specific environment. +# See the Table API & SQL documentation for details about supported properties. + + #============================================================================== # Table Sources #============================================================================== -# Define table sources and sinks here. See the Table API & SQL documentation for details. +# Define table sources and sinks here. tables: [] # empty list # A typical table source definition looks like: @@ -36,6 +39,19 @@ tables: [] # empty list # schema: ... #============================================================================== +# User-defined functions +#============================================================================== + +# Define scalar, aggregate, or table functions here. + +functions: [] # empty list +# A typical function definition looks like: +# - name: ... +# from: class +# class: ... +# constructor: ... + +#============================================================================== # Execution properties #============================================================================== http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 233e49b..7bf7bb7 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -201,6 +201,9 @@ public class CliClient { case SHOW_TABLES: callShowTables(cmdCall); break; + case SHOW_FUNCTIONS: + callShowFunctions(cmdCall); + break; case DESCRIBE: callDescribe(cmdCall); break; @@ -284,6 +287,22 @@ public class CliClient { terminal.flush(); } + private void callShowFunctions(SqlCommandCall cmdCall) { + final List<String> functions; + try { + functions = executor.listUserDefinedFunctions(context); + } catch (SqlExecutionException e) { + printException(e); + return; + } + if (functions.isEmpty()) { + terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi()); + } else { + functions.forEach((v) -> terminal.writer().println(v)); + } + terminal.flush(); + } + private void callDescribe(SqlCommandCall cmdCall) { final TableSchema schema; try { http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java index 1e8f696..b917317 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java @@ -45,6 +45,7 @@ public final class CliStrings { .append(formatCommand(SqlCommand.CLEAR, "Clears the current terminal.")) .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables.")) + .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all registered user-defined functions.")) .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java index 214a17d..5543a3e 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java @@ -86,6 +86,7 @@ public final class SqlCommandParser { CLEAR("clear"), HELP("help"), SHOW_TABLES("show tables"), + SHOW_FUNCTIONS("show functions"), DESCRIBE("describe"), EXPLAIN("explain"), SELECT("select"), http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java index c1db4c1..b26c45f 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java @@ -42,17 +42,17 @@ public class Environment { private Map<String, TableDescriptor> tables; + private Map<String, UserDefinedFunction> functions; + private Execution execution; private Deployment deployment; - private Map<String, UserDefinedFunction> functions; - public Environment() { this.tables = Collections.emptyMap(); + this.functions = Collections.emptyMap(); this.execution = new Execution(); this.deployment = new Deployment(); - this.functions = Collections.emptyMap(); } public Map<String, TableDescriptor> getTables() { @@ -69,7 +69,7 @@ public class Environment { config.remove(TableDescriptorValidator.TABLE_TYPE()); final Source s = Source.create(config); if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); + throw new SqlClientException("Duplicate source name '" + s.getName() + "'."); } this.tables.put(s.getName(), s); } else { @@ -79,11 +79,18 @@ public class Environment { }); } + public Map<String, UserDefinedFunction> getFunctions() { + return functions; + } + public void setFunctions(List<Map<String, Object>> functions) { this.functions = new HashMap<>(functions.size()); functions.forEach(config -> { final UserDefinedFunction f = UserDefinedFunction.create(config); - this.functions.put(f.name(), f); + if (this.tables.containsKey(f.getName())) { + throw new SqlClientException("Duplicate function name '" + f.getName() + "'."); + } + this.functions.put(f.getName(), f); }); } @@ -103,10 +110,6 @@ public class Environment { return deployment; } - public Map<String, UserDefinedFunction> getFunctions() { - return functions; - } - @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -117,6 +120,13 @@ public class Environment { table.addProperties(props); props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n')); }); + sb.append("=================== Functions ====================\n"); + functions.forEach((name, function) -> { + sb.append("- name: ").append(name).append("\n"); + final DescriptorProperties props = new DescriptorProperties(true); + function.addProperties(props); + props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n')); + }); sb.append("=================== Execution ====================\n"); execution.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n')); sb.append("=================== Deployment ===================\n"); @@ -153,7 +163,7 @@ public class Environment { // merge functions final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions()); - mergedEnv.getFunctions().putAll(env2.getFunctions()); + functions.putAll(env2.getFunctions()); mergedEnv.functions = functions; // merge execution properties @@ -165,12 +175,18 @@ public class Environment { return mergedEnv; } + /** + * Enriches an environment with new/modified properties and returns the new instance. + */ public static Environment enrich(Environment env, Map<String, String> properties) { final Environment enrichedEnv = new Environment(); // merge tables enrichedEnv.tables = new HashMap<>(env.getTables()); + // merge functions + enrichedEnv.functions = new HashMap<>(env.getFunctions()); + // enrich execution properties enrichedEnv.execution = Execution.enrich(env.execution, properties); http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java index 7b2498f..2bef257 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.Map; /** - * Configuration of a table source. Parses an entry in the `sources` list of an environment + * Configuration of a table source. Parses an entry in the `tables` list of an environment * file and translates to table descriptor properties. */ public class Source extends TableSourceDescriptor { @@ -49,6 +49,9 @@ public class Source extends TableSourceDescriptor { return properties; } + /** + * Creates a table source descriptor with the given config. + */ public static Source create(Map<String, Object> config) { if (!config.containsKey(NAME)) { throw new SqlClientException("The 'name' attribute of a table source is missing."); http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java index 235f4ea..8652d40 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java @@ -21,68 +21,47 @@ package org.apache.flink.table.client.config; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FunctionDescriptor; -import org.apache.flink.table.descriptors.FunctionValidator; +import java.util.HashMap; import java.util.Map; -import static org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS; - /** * Descriptor for user-defined functions. */ public class UserDefinedFunction extends FunctionDescriptor { - private static final String FROM = "from"; - - private From from; - + private String name; private Map<String, String> properties; - private UserDefinedFunction(String name, From from, Map<String, String> properties) { - super(name); - this.from = from; + private static final String NAME = "name"; + + private UserDefinedFunction(String name, Map<String, String> properties) { + this.name = name; this.properties = properties; } - /** - * Gets where the user-defined function should be created from. - */ - public From getFrom() { - return from; + public String getName() { + return name; + } + + public Map<String, String> getProperties() { + return properties; } /** - * Creates a UDF descriptor with the given config. + * Creates a user-defined function descriptor with the given config. */ public static UserDefinedFunction create(Map<String, Object> config) { - Map<String, String> udfConfig = ConfigUtil.normalizeYaml(config); - if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) { + if (!config.containsKey(NAME)) { throw new SqlClientException("The 'name' attribute of a function is missing."); } - - final String name = udfConfig.get(FunctionValidator.FUNCTION_NAME()); - if (name.trim().length() <= 0) { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).trim().length() <= 0) { throw new SqlClientException("Invalid function name '" + name + "'."); } - - // the default value is "CLASS" - From fromValue = CLASS; - - if (udfConfig.containsKey(FROM)) { - final String from = udfConfig.get(FROM); - try { - fromValue = From.valueOf(from.toUpperCase()); - } catch (IllegalArgumentException ex) { - throw new SqlClientException("Unknown 'from' value '" + from + "'."); - } - } - - switch (fromValue) { - case CLASS: - return new UserDefinedFunction(name, fromValue, udfConfig); - default: - throw new SqlClientException("The from attribute can only be \"class\" now."); - } + final Map<String, Object> properties = new HashMap<>(config); + properties.remove(NAME); + return new UserDefinedFunction((String) name, ConfigUtil.normalizeYaml(properties)); } // -------------------------------------------------------------------------------------------- @@ -91,8 +70,4 @@ public class UserDefinedFunction extends FunctionDescriptor { public void addProperties(DescriptorProperties properties) { this.properties.forEach(properties::putString); } - - enum From { - CLASS - } } http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index 74e6a6b..f12ff4d 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -46,6 +46,11 @@ public interface Executor { List<String> listTables(SessionContext session) throws SqlExecutionException; /** + * Lists all user-defined functions known to the executor. + */ + List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException; + + /** * Returns the schema of a table. Throws an exception if the table could not be found. The * schema might contain time attribute types for helping the user during debugging a query. */ http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 0cd9738..9cd3d8d 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -48,10 +48,9 @@ import org.apache.flink.table.client.config.Deployment; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.SqlExecutionException; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.TableSourceDescriptor; -import org.apache.flink.table.descriptors.service.FunctionService; import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.FunctionService; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.UserDefinedFunction; @@ -105,20 +104,18 @@ public class ExecutionContext<T> { tableSources = new HashMap<>(); mergedEnv.getTables().forEach((name, descriptor) -> { if (descriptor instanceof TableSourceDescriptor) { - TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource( - (TableSourceDescriptor) descriptor, classLoader); + final TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource( + (TableSourceDescriptor) descriptor, + classLoader); tableSources.put(name, tableSource); } }); - // generate user-defined functions + // create user-defined functions functions = new HashMap<>(); mergedEnv.getFunctions().forEach((name, descriptor) -> { - DescriptorProperties properties = new DescriptorProperties(true); - descriptor.addProperties(properties); - functions.put( - name, - FunctionService.generateUserDefinedFunction(properties, classLoader)); + final UserDefinedFunction function = FunctionService.createFunction(descriptor, classLoader); + functions.put(name, function); }); // convert deployment options into command line options that describe a cluster @@ -227,7 +224,7 @@ public class ExecutionContext<T> { // register table sources tableSources.forEach(tableEnv::registerTableSource); - // register UDFs + // register user-defined functions if (tableEnv instanceof StreamTableEnvironment) { StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv; functions.forEach((k, v) -> { @@ -237,6 +234,8 @@ public class ExecutionContext<T> { streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v); } else if (v instanceof TableFunction) { streamTableEnvironment.registerFunction(k, (TableFunction<?>) v); + } else { + throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName()); } }); } else { @@ -248,6 +247,8 @@ public class ExecutionContext<T> { batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v); } else if (v instanceof TableFunction) { batchTableEnvironment.registerFunction(k, (TableFunction<?>) v); + } else { + throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName()); } }); } http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index bd463ac..d658ee9 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -191,6 +191,14 @@ public class LocalExecutor implements Executor { } @Override + public List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException { + final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + .createEnvironmentInstance() + .getTableEnvironment(); + return Arrays.asList(tableEnv.listUserDefinedFunctions()); + } + + @Override public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException { final TableEnvironment tableEnv = getOrCreateExecutionContext(session) .createEnvironmentInstance() http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index 8ba88ec..d1ea5a1 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; @@ -28,8 +29,10 @@ import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; import org.apache.commons.cli.Options; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; /** @@ -46,6 +49,17 @@ public class ExecutionContextTest { assertEquals(99, config.getAutoWatermarkInterval()); } + @Test + public void testFunctions() throws Exception { + final ExecutionContext<?> context = createExecutionContext(); + final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment(); + final String[] expected = new String[]{"scalarUDF", "tableUDF", "aggregateUDF"}; + final String[] actual = tableEnv.listUserDefinedFunctions(); + Arrays.sort(expected); + Arrays.sort(actual); + assertArrayEquals(expected, actual); + } + private <T> ExecutionContext<T> createExecutionContext() throws Exception { final Environment env = EnvironmentFileUtil.parseModified( DEFAULTS_ENVIRONMENT_FILE, http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index b32353f..8f62be4 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -66,7 +66,6 @@ import static org.junit.Assert.assertTrue; public class LocalExecutorITCase extends TestLogger { private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; - private static final String UDF_ENVIRONMENT_FILE = "test-sql-client-udf.yaml"; private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; @@ -107,6 +106,17 @@ public class LocalExecutorITCase extends TestLogger { } @Test + public void testListUserDefinedFunctions() throws Exception { + final Executor executor = createDefaultExecutor(clusterClient); + final SessionContext session = new SessionContext("test-session", new Environment()); + + final List<String> actualTables = executor.listUserDefinedFunctions(session); + + final List<String> expectedTables = Arrays.asList("aggregateUDF", "tableUDF", "scalarUDF"); + assertEquals(expectedTables, actualTables); + } + + @Test public void testGetSessionProperties() throws Exception { final Executor executor = createDefaultExecutor(clusterClient); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -150,68 +160,6 @@ public class LocalExecutorITCase extends TestLogger { } @Test(timeout = 30_000L) - public void testScalarUDF() throws Exception { - final Executor executor = - createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient); - final SessionContext session = new SessionContext("test-scalarUDF", new Environment()); - final ResultDescriptor rd = - executor.executeQuery(session, "SELECT scalarUDF(10)"); - final List<String> actualResults = - retrieveChangelogResult(executor, session, rd.getResultId()); - final List<String> expectedResults = new ArrayList<>(); - expectedResults.add("(true,15)"); - TestBaseUtils.compareResultCollections( - expectedResults, actualResults, Comparator.naturalOrder()); - } - - @Test(timeout = 30_000L) - public void testAggregateUDF() throws Exception { - final Executor executor = - createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient); - final SessionContext session = new SessionContext("test-aggregateUDF", new Environment()); - final ResultDescriptor rd = - executor.executeQuery(session, "SELECT aggregateUDF(cast(1 as BIGINT))"); - final List<String> actualResults = - retrieveChangelogResult(executor, session, rd.getResultId()); - final List<String> expectedResults = new ArrayList<>(); - expectedResults.add("(true,100)"); - TestBaseUtils.compareResultCollections( - expectedResults, actualResults, Comparator.naturalOrder()); - } - - @Test(timeout = 30_000L) - public void testTableUDF() throws Exception { - final URL url = getClass().getClassLoader().getResource("test-data.csv"); - Objects.requireNonNull(url); - final Map<String, String> replaceVars = new HashMap<>(); - replaceVars.put("$VAR_0", url.getPath()); - final Executor executor = - createModifiedExecutor(UDF_ENVIRONMENT_FILE, clusterClient, replaceVars); - - final SessionContext session = new SessionContext("test-aggregateUDF", new Environment()); - final ResultDescriptor rd = - executor.executeQuery( - session, - "SELECT w, l from TableNumber1, LATERAL TABLE(tableUDF(StringField1)) as T(w, l)"); - final List<String> actualResults = - retrieveChangelogResult(executor, session, rd.getResultId()); - final List<String> expectedResults = new ArrayList<>(); - expectedResults.add("(true,Hello,10)"); - expectedResults.add("(true,World,10)"); - expectedResults.add("(true,Hello,10)"); - expectedResults.add("(true,World,10)"); - expectedResults.add("(true,Hello,10)"); - expectedResults.add("(true,World,10)"); - expectedResults.add("(true,Hello,10)"); - expectedResults.add("(true,World,10)"); - expectedResults.add("(true,Hello,10)"); - expectedResults.add("(true,World,10)"); - expectedResults.add("(true,Hello,10)"); - expectedResults.add("(true,World!!!!,14)"); - TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder()); - } - - @Test(timeout = 30_000L) public void testStreamQueryExecutionChangelog() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -226,7 +174,9 @@ public class LocalExecutorITCase extends TestLogger { try { // start job and retrieval - final ResultDescriptor desc = executor.executeQuery(session, "SELECT * FROM TableNumber1"); + final ResultDescriptor desc = executor.executeQuery( + session, + "SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1"); assertFalse(desc.isMaterialized()); @@ -234,12 +184,12 @@ public class LocalExecutorITCase extends TestLogger { retrieveChangelogResult(executor, session, desc.getResultId()); final List<String> expectedResults = new ArrayList<>(); - expectedResults.add("(true,42,Hello World)"); - expectedResults.add("(true,22,Hello World)"); - expectedResults.add("(true,32,Hello World)"); - expectedResults.add("(true,32,Hello World)"); - expectedResults.add("(true,42,Hello World)"); - expectedResults.add("(true,52,Hello World!!!!)"); + expectedResults.add("(true,47,Hello World)"); + expectedResults.add("(true,27,Hello World)"); + expectedResults.add("(true,37,Hello World)"); + expectedResults.add("(true,37,Hello World)"); + expectedResults.add("(true,47,Hello World)"); + expectedResults.add("(true,57,Hello World!!!!)"); TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder()); } finally { @@ -262,19 +212,21 @@ public class LocalExecutorITCase extends TestLogger { try { // start job and retrieval - final ResultDescriptor desc = executor.executeQuery(session, "SELECT IntegerField1 FROM TableNumber1"); + final ResultDescriptor desc = executor.executeQuery( + session, + "SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1"); assertTrue(desc.isMaterialized()); final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId()); final List<String> expectedResults = new ArrayList<>(); - expectedResults.add("42"); - expectedResults.add("22"); - expectedResults.add("32"); - expectedResults.add("32"); - expectedResults.add("42"); - expectedResults.add("52"); + expectedResults.add("47,Hello World"); + expectedResults.add("27,Hello World"); + expectedResults.add("37,Hello World"); + expectedResults.add("37,Hello World"); + expectedResults.add("47,Hello World"); + expectedResults.add("57,Hello World!!!!"); TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder()); } finally { @@ -317,32 +269,19 @@ public class LocalExecutorITCase extends TestLogger { } private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception { - return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient); - } - - private <T> LocalExecutor createDefaultExecutor( - String configFileName, ClusterClient<T> - clusterClient) throws Exception { return new LocalExecutor( - EnvironmentFileUtil.parseModified(configFileName, Collections.singletonMap("$VAR_2", "batch")), + EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")), Collections.emptyList(), clusterClient.getFlinkConfiguration(), new DummyCustomCommandLine<T>(clusterClient)); } private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception { - return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient, replaceVars); - } - - private <T> LocalExecutor createModifiedExecutor( - String configFileName, - ClusterClient<T> clusterClient, - Map<String, String> replaceVars) throws Exception { return new LocalExecutor( - EnvironmentFileUtil.parseModified(configFileName, replaceVars), - Collections.emptyList(), - clusterClient.getFlinkConfiguration(), - new DummyCustomCommandLine<T>(clusterClient)); + EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars), + Collections.emptyList(), + clusterClient.getFlinkConfiguration(), + new DummyCustomCommandLine<T>(clusterClient)); } private List<String> retrieveTableResult( @@ -373,6 +312,7 @@ public class LocalExecutorITCase extends TestLogger { Executor executor, SessionContext session, String resultID) throws InterruptedException { + final List<String> actualResults = new ArrayList<>(); while (true) { Thread.sleep(50); // slow the processing down http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java index b93ef66..9629c7f 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java @@ -27,12 +27,12 @@ import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; /** - * A bunch of UDFs for SQL-Client test. + * A bunch of UDFs for testing the SQL Client. */ public class UserDefinedFunctions { /** - * The scalar function for SQL-Client test. + * The scalar function for SQL Client test. */ public static class ScalarUDF extends ScalarFunction { @@ -48,12 +48,12 @@ public class UserDefinedFunctions { } /** - * The aggregate function for SQL-Client test. + * The aggregate function for SQL Client test. */ public static class AggregateUDF extends AggregateFunction<Long, Long> { public AggregateUDF(String name, Boolean flag, Integer value) { - + // do nothing } @Override @@ -67,7 +67,7 @@ public class UserDefinedFunctions { } public void accumulate(Long acc, Long value) { - + // do nothing } @Override @@ -77,10 +77,10 @@ public class UserDefinedFunctions { } /** - * The table function for SQL-Client test. + * The table function for SQL Client test. */ public static class TableUDF extends TableFunction<Row> { - private long extra = 2L; + private long extra; public TableUDF(Long extra) { this.extra = extra; @@ -100,6 +100,4 @@ public class UserDefinedFunctions { return Types.ROW(Types.STRING(), Types.LONG()); } } - } - http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index a9b4161..0d6778a 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -63,6 +63,31 @@ tables: line-delimiter: "\n" comment-prefix: "#" +functions: + - name: scalarUDF + from: class + class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF + constructor: + - 5 + - name: aggregateUDF + from: class + class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF + constructor: + - StarryName + - false + - class: java.lang.Integer + constructor: + - class: java.lang.String + constructor: + - type: VARCHAR + value: 3 + - name: tableUDF + from: class + class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF + constructor: + - type: LONG + value: 5 + execution: type: "$VAR_2" time-characteristic: event-time http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml deleted file mode 100644 index 3e16030..0000000 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml +++ /dev/null @@ -1,82 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -#============================================================================== -# TEST ENVIRONMENT FILE -# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase. -#============================================================================== - -# this file has variables that can be filled with content by replacing $VAR_XXX - -tables: - - name: TableNumber1 - type: source - schema: - - name: IntegerField1 - type: INT - - name: StringField1 - type: VARCHAR - connector: - type: filesystem - path: "$VAR_0" - format: - type: csv - fields: - - name: IntegerField1 - type: INT - - name: StringField1 - type: VARCHAR - line-delimiter: "\n" - comment-prefix: "#" - -functions: - - name: scalarUDF - from: class - class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF - constructor: - - 5 - - - name: aggregateUDF - from: class - class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF - constructor: - - StarryName - - false - - class: java.lang.Integer - constructor: - - class: java.lang.String - constructor: - - type: VARCHAR - value: 3 - - - name: tableUDF - from: class - class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF - constructor: - - type: LONG - value: 5 - -execution: - type: streaming - parallelism: 1 - result-mode: changelog - -deployment: - response-timeout: 5000 - - http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala new file mode 100644 index 0000000..007f2ba --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Types + +import scala.collection.mutable.ArrayBuffer + +/** + * Descriptor for a class instance. A class instance is a Java/Scala object created from a class + * with a public constructor (with or without parameters). + */ +class ClassInstance extends HierarchyDescriptor { + + private var className: Option[String] = None + + // the parameter is either a literal value or the instance of a class + private val constructor: ArrayBuffer[Either[LiteralValue, ClassInstance]] = ArrayBuffer() + + /** + * Sets the fully qualified class name for creating an instance. + * + * E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass + * + * @param className fully qualified class name + */ + def of(className: String): ClassInstance = { + this.className = Option(className) + this + } + + /** + * Adds a constructor parameter value of literal type. The type is automatically derived from + * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. Expression + * values are not allowed. + * + * Examples: + * - "true", "false" -> BOOLEAN + * - "42", "-5" -> INT + * - "2.0", "1234.222" -> DOUBLE + * - VARCHAR otherwise + * + * For other types and explicit type declaration use [[parameter(String, String)]] or + * [[parameter(TypeInformation, String)]]. + * + */ + def parameterString(valueString: String): ClassInstance = { + constructor += Left(new LiteralValue().value(valueString)) + this + } + + /** + * Adds a constructor parameter value of literal type. The type is explicitly defined using a + * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value is parsed + * accordingly. Expression values are not allowed. + * + * @param typeString the type string that define how to parse the given value string + * @param valueString the literal value to be parsed + */ + def parameter(typeString: String, valueString: String): ClassInstance = { + constructor += Left(new LiteralValue().of(typeString).value(valueString)) + this + } + + /** + * Adds a constructor parameter value of literal type. The type is explicitly defined using + * type information. The value is parsed accordingly. Expression values are not allowed. + * + * @param typeInfo the type that define how to parse the given value string + * @param valueString the literal value to be parsed + */ + def parameter(typeInfo: TypeInformation[_], valueString: String): ClassInstance = { + constructor += Left(new LiteralValue().of(typeInfo).value(valueString)) + this + } + + /** + * Adds a constructor parameter value of BOOLEAN type. + * + * @param value BOOLEAN value + */ + def parameter(value: Boolean): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.BOOLEAN).value(value)) + this + } + + /** + * Adds a constructor parameter value of DOUBLE type. + * + * @param value DOUBLE value + */ + def parameter(value: Double): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.DOUBLE).value(value)) + this + } + + /** + * Adds a constructor parameter value of FLOAT type. + * + * @param value FLOAT value + */ + def parameter(value: Float): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.FLOAT).value(value)) + this + } + + /** + * Adds a constructor parameter value of INT type. + * + * @param value INT value + */ + def parameter(value: Int): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.INT).value(value)) + this + } + + /** + * Adds a constructor parameter value of VARCHAR type. + * + * @param value VARCHAR value + */ + def parameter(value: String): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.STRING).value(value)) + this + } + + /** + * Adds a constructor parameter value of BIGINT type. + * + * @param value BIGINT value + */ + def parameter(value: Long): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.LONG).value(value)) + this + } + + /** + * Adds a constructor parameter value of TINYINT type. + * + * @param value TINYINT value + */ + def parameter(value: Byte): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.BYTE).value(value)) + this + } + + /** + * Adds a constructor parameter value of SMALLINT type. + * + * @param value SMALLINT value + */ + def parameter(value: Short): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.SHORT).value(value)) + this + } + + /** + * Adds a constructor parameter value of DECIMAL type. + * + * @param value DECIMAL value + */ + def parameter(value: java.math.BigDecimal): ClassInstance = { + constructor += Left(new LiteralValue().of(Types.DECIMAL).value(value)) + this + } + + /** + * Adds a constructor parameter value of a class instance (i.e. a Java object with a public + * constructor). + * + * @param classInstance description of a class instance (i.e. a Java object with a public + * constructor). + */ + def parameter(classInstance: ClassInstance): ClassInstance = { + constructor += Right(classInstance) + this + } + + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties) + } + + /** + * Internal method for properties conversion. + */ + override private[flink] def addPropertiesWithPrefix( + keyPrefix: String, + properties: DescriptorProperties): Unit = { + className.foreach(properties.putString(s"$keyPrefix${ClassInstanceValidator.CLASS}", _)) + var i = 0 + while (i < constructor.size) { + constructor(i) match { + case Left(literalValue) => + literalValue.addPropertiesWithPrefix( + s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.", + properties) + case Right(classInstance) => + classInstance.addPropertiesWithPrefix( + s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.", + properties) + } + i += 1 + } + } +} + +/** + * Descriptor for a class instance. A class instance is a Java/Scala object created from a class + * with a public constructor (with or without parameters). + */ +object ClassInstance { + + /** + * Descriptor for a class instance. A class instance is a Java/Scala object created from a class + * with a public constructor (with or without parameters). + */ + def apply() = new ClassInstance +} http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala new file mode 100644 index 0000000..d9c3a02 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.HierarchyDescriptorValidator.EMPTY_PREFIX + +import scala.collection.JavaConversions._ + +/** + * Validator for [[ClassInstance]]. + */ +class ClassInstanceValidator(keyPrefix: String = EMPTY_PREFIX) + extends HierarchyDescriptorValidator(keyPrefix) { + + override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = { + // check class name + properties.validateString(s"$keyPrefix${ClassInstanceValidator.CLASS}", isOptional = false, 1) + + // check constructor + val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}" + + val constructorProperties = properties.getVariableIndexedProperties(constructorPrefix, List()) + var i = 0 + while (i < constructorProperties.size()) { + // nested class instance + if (constructorProperties(i).containsKey(ClassInstanceValidator.CLASS)) { + val classInstanceValidator = new ClassInstanceValidator(s"$constructorPrefix.$i.") + classInstanceValidator.validate(properties) + } + // literal value + else { + val primitiveValueValidator = new LiteralValueValidator(s"$constructorPrefix.$i.") + primitiveValueValidator.validate(properties) + } + i += 1 + } + } +} + +object ClassInstanceValidator { + val CLASS = "class" + val CONSTRUCTOR = "constructor" +} http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala deleted file mode 100644 index 78ceb3a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.typeutils.TypeStringUtils - -import scala.collection.mutable.ArrayBuffer - -/** - * Descriptor for a class type. - * - * @param className the full name of the class (e.g., java.lang.Integer) - */ -class ClassType(var className: Option[String] = None) - extends HierarchyDescriptor { - - // the parameter is either a primitive type or a class type - private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] = - ArrayBuffer() - - /** - * Sets the class name for the descriptor. - */ - def of(name: String): ClassType = { - this.className = Option(name) - this - } - - /** - * Adds the given string formatted value as a parameter, the type of which will be automatically - * derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and "abc" -> String). - * - */ - def strParam(valueStr: String): ClassType = { - val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr) - param(typeString, valueStr) - } - - /** - * Adds the given string formatted value as a parameter, the type of which will be decided by the - * given type string (e.g., "DOUBLE", "VARCHAR"). - */ - def param(typeStr: String, valueStr: String): ClassType = { - param(TypeStringUtils.readTypeInfo(typeStr), valueStr) - } - - /** - * Adds the give string formatted value as a parameter, the type of which will be defined by the - * given type information. - */ - def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = { - constructor += Left( - new PrimitiveType[T]() - .of(typeInfo) - .value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, valueStr))) - this - } - - /** - * Adds the give value as a parameter, the type of which will be automatically derived. - */ - def param[T](value: T): ClassType = { - constructor += Left( - new PrimitiveType[T]() - .of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]])) - .value(value)) - this - } - - /** - * Adds a parameter defined by the given class type descriptor. - */ - def param(field: ClassType): ClassType = { - constructor += Right(field) - this - } - - override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - addPropertiesWithPrefix("", properties) - } - - override private[flink] def addPropertiesWithPrefix( - keyPrefix: String, - properties: DescriptorProperties): Unit = { - className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}", _)) - var i = 0 - while (i < constructor.size) { - constructor(i) match { - case Left(basicType) => - basicType.addPropertiesWithPrefix( - s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.", - properties) - case Right(classType) => - classType.addPropertiesWithPrefix( - s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.", - properties) - } - i += 1 - } - } -} - -object ClassType { - def apply() = new ClassType - def apply(className: String) = new ClassType(Option(className)) -} http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala deleted file mode 100644 index ebb48b5..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.api.ValidationException - -import scala.collection.JavaConversions._ - -/** - * Validator for [[ClassType]]. - */ -class ClassTypeValidator extends HierarchyDescriptorValidator { - override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = { - - properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", isOptional = false) - - val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}" - normalizeConstructorParams(constructorPrefix, properties) - - val constructorProps = - properties.getVariableIndexedProperties(constructorPrefix, List()) - var i = 0 - val primitiveTypeValidator = new PrimitiveTypeValidator - while (i < constructorProps.size()) { - if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) { - primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", properties) - } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) { - validateWithPrefix(s"$constructorPrefix.$i.", properties) - } else { - throw ValidationException("A constructor field must contain a 'class' or a 'type' key.") - } - i += 1 - } - } - - /** - * For each constructor parameter (e.g., constructor.0 = abc), we derive its type and replace it - * with the normalized form (e.g., constructor.0.type = VARCHAR, constructor.0.value = abc); - * - * @param constructorPrefix the prefix to get the constructor parameters - * @param properties the descriptor properties - */ - def normalizeConstructorParams( - constructorPrefix: String, - properties: DescriptorProperties): Unit = { - val constructorValues = properties.getListProperties(constructorPrefix) - constructorValues.foreach(kv => { - properties.unsafeRemove(kv._1) - val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2) - properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp) - properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", kv._2) - }) - } -} - -object ClassTypeValidator { - val CLASS = "class" - val CONSTRUCTOR = "constructor" -} http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala index ad97ded..e21527b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala @@ -19,8 +19,12 @@ package org.apache.flink.table.descriptors /** - * A class that adds a set of string-based, normalized properties for describing a - * table source or table sink. + * A class that adds a set of string-based, normalized properties for describing DDL information. + * + * Typical characteristics of a descriptor are: + * - descriptors have a default constructor and a default 'apply()' method for Scala + * - descriptors themselves contain very little logic + * - corresponding validators validate the correctness (goal: have a single point of validation) */ abstract class Descriptor { http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala index 8d410d0..4f5bd81 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala @@ -249,7 +249,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { /** * Returns a big decimal value under the given existing key. */ - def getBigDecimal(key: String): BigDecimal = { + def getBigDecimal(key: String): JBigDecimal = { getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key)) } @@ -292,7 +292,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** - * Returns a double value under the given key if it exists. + * Returns a double value under the given existing key. */ def getDouble(key: String): Double = { getOptionalDouble(key).orElseThrow(exceptionSupplier(key)) @@ -307,7 +307,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** - * Returns a float value under the given key if it exists. + * Returns a float value under the given given existing key. */ def getFloat(key: String): Float = { getOptionalFloat(key).orElseThrow(exceptionSupplier(key)) @@ -489,13 +489,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { // filter for index val escapedKey = Pattern.quote(key) - val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)") + val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)(\\.)?(.*)") // extract index and property keys val indexes = properties.keys.flatMap { k => val matcher = pattern.matcher(k) if (matcher.find()) { - Some((JInt.parseInt(matcher.group(1)), matcher.group(2))) + Some((JInt.parseInt(matcher.group(1)), matcher.group(3))) } else { None } @@ -531,16 +531,6 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** - * Returns all properties under a group of array formed keys. - * - * E.g. constructor -> returns all constructor.# properties. - */ - def getListProperties(key: String): JMap[String, String] = { - val escapedKey = Pattern.quote(key) - properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava - } - - /** * Returns all properties under a given key that contains an index in between. * * E.g. rowtime.0.name -> returns all rowtime.#.name properties @@ -744,7 +734,8 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { */ def validateBigDecimal( key: String, - isOptional: Boolean): Unit = { + isOptional: Boolean) + : Unit = { if (!properties.contains(key)) { if (!isOptional) { @@ -767,14 +758,14 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { def validateBigDecimal( key: String, isOptional: Boolean, - min: BigDecimal, // inclusive - max: BigDecimal) // inclusive - : Unit = { + min: JBigDecimal, // inclusive + max: JBigDecimal) // inclusive + : Unit = { validateComparable( key, isOptional, - min.bigDecimal, - max.bigDecimal, + min, + max, (value: String) => new JBigDecimal(value)) } @@ -783,7 +774,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { */ def validateByte( key: String, - isOptional: Boolean): Unit = validateDouble(key, isOptional, Byte.MinValue, Byte.MaxValue) + isOptional: Boolean): Unit = validateByte(key, isOptional, Byte.MinValue, Byte.MaxValue) /** * Validates a byte property. The boundaries are inclusive. @@ -792,7 +783,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { key: String, isOptional: Boolean, min: Byte) // inclusive - : Unit = { + : Unit = { validateByte(key, isOptional, min, Byte.MaxValue) } @@ -804,7 +795,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { isOptional: Boolean, min: Byte, // inclusive max: Byte) // inclusive - : Unit = { + : Unit = { validateComparable(key, isOptional, new JByte(min), new JByte(max), JByte.valueOf) } @@ -822,7 +813,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { key: String, isOptional: Boolean, min: Float) // inclusive - : Unit = { + : Unit = { validateFloat(key, isOptional, min, Float.MaxValue) } @@ -834,7 +825,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { isOptional: Boolean, min: Float, // inclusive max: Float) // inclusive - : Unit = { + : Unit = { validateComparable(key, isOptional, new JFloat(min), new JFloat(max), JFloat.valueOf) } @@ -848,58 +839,27 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { /** * Validates a short property. The boundaries are inclusive. */ - def validateFloat( + def validateShort( key: String, isOptional: Boolean, min: Short) // inclusive - : Unit = { + : Unit = { validateShort(key, isOptional, min, Short.MaxValue) } /** - * Validates a float property. The boundaries are inclusive. + * Validates a short property. The boundaries are inclusive. */ def validateShort( key: String, isOptional: Boolean, min: Short, // inclusive max: Short) // inclusive - : Unit = { + : Unit = { validateComparable(key, isOptional, new JShort(min), new JShort(max), JShort.valueOf) } /** - * Validates a property by first parsing the string value to a comparable object. - * The boundaries are inclusive. - */ - private def validateComparable[T <: Comparable[T]]( - key: String, - isOptional: Boolean, - min: T, - max: T, - parseFunction: String => T) - : Unit = { - if (!properties.contains(key)) { - if (!isOptional) { - throw new ValidationException(s"Could not find required property '$key'.") - } - } else { - try { - val value = parseFunction(properties(key)) - - if (value.compareTo(min) < 0 || value.compareTo(max) > 0) { - throw new ValidationException(s"Property '$key' must be a ${min.getClass.getSimpleName}" + - s" value between $min and $max but was: ${properties(key)}") - } - } catch { - case _: NumberFormatException => - throw new ValidationException( - s"Property '$key' must be a byte value but was: ${properties(key)}") - } - } - } - - /** * Validation for variable indexed properties. * * For example: @@ -1211,6 +1171,38 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } } + + /** + * Validates a property by first parsing the string value to a comparable object. + * The boundaries are inclusive. + */ + private def validateComparable[T <: Comparable[T]]( + key: String, + isOptional: Boolean, + min: T, + max: T, + parseFunction: String => T) + : Unit = { + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + val typeName = min.getClass.getSimpleName + try { + val value = parseFunction(properties(key)) + + if (value.compareTo(min) < 0 || value.compareTo(max) > 0) { + throw new ValidationException(s"Property '$key' must be a $typeName" + + s" value between $min and $max but was: ${properties(key)}") + } + } catch { + case _: NumberFormatException => + throw new ValidationException( + s"Property '$key' must be a $typeName value but was: ${properties(key)}") + } + } + } } object DescriptorProperties { http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala index f4c8363..13d5056 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala @@ -19,34 +19,38 @@ package org.apache.flink.table.descriptors /** - * Descriptor for describing a function that can be instantiated from somewhere (e.g., a class). - * - * @param name name of the function + * Descriptor for describing a function. */ -class FunctionDescriptor(var name: String) extends Descriptor { +class FunctionDescriptor extends Descriptor { - var classDescriptor: Option[ClassType] = None + private var from: Option[String] = None + private var classInstance: Option[ClassInstance] = None /** - * Uses the class provided by the descriptor to instantiate the function. + * Creates a function from a class description. */ - def using(classDescriptor: ClassType): FunctionDescriptor = { - this.classDescriptor = Option(classDescriptor) + def fromClass(classType: ClassInstance): FunctionDescriptor = { + from = Some(FunctionDescriptorValidator.FROM_VALUE_CLASS) + this.classInstance = Option(classType) this } - def getDescriptorProperties: DescriptorProperties = { - val descriptorProperties = new DescriptorProperties() - addProperties(descriptorProperties) - descriptorProperties - } - + /** + * Internal method for format properties conversion. + */ override def addProperties(properties: DescriptorProperties): Unit = { - properties.putString(FunctionValidator.FUNCTION_NAME, name) - classDescriptor.foreach(_.addProperties(properties)) + from.foreach(properties.putString(FunctionDescriptorValidator.FROM, _)) + classInstance.foreach(_.addProperties(properties)) } } +/** + * Descriptor for describing a function. + */ object FunctionDescriptor { - def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name) + + /** + * Descriptor for describing a function. + */ + def apply(): FunctionDescriptor = new FunctionDescriptor() } http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala new file mode 100644 index 0000000..c119777 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.descriptors.DescriptorProperties.toJava +import org.apache.flink.table.descriptors.FunctionDescriptorValidator.FROM + +import scala.collection.JavaConverters._ + +/** + * Validator for [[FunctionDescriptor]]. + */ +class FunctionDescriptorValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + + val classValidation = (_: String) => { + new ClassInstanceValidator().validate(properties) + } + + // check for 'from' + if (properties.containsKey(FROM)) { + properties.validateEnum( + FROM, + isOptional = false, + Map( + FunctionDescriptorValidator.FROM_VALUE_CLASS -> toJava(classValidation) + ).asJava + ) + } else { + throw new ValidationException("Could not find 'from' property for function.") + } + } +} + +object FunctionDescriptorValidator { + + val FROM = "from" + val FROM_VALUE_CLASS = "class" +} http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala deleted file mode 100644 index f36b9c2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.functions.UserDefinedFunction - -import scala.collection.JavaConverters._ - -/** - * Validator for [[FunctionDescriptor]]. - */ -class FunctionValidator extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = false, 1) - new ClassTypeValidator().validate(properties) - } -} - -object FunctionValidator { - - val FUNCTION_NAME = "name" - -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala index f97c807..b958b1c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala @@ -25,10 +25,11 @@ package org.apache.flink.table.descriptors abstract class HierarchyDescriptor extends Descriptor { /** - * Internal method for properties conversion. All the property keys will be prefixed according - * to the level. + * Internal method for properties conversion. All the property keys will be prefixed with the + * given key prefix. */ private[flink] def addPropertiesWithPrefix( - keyPrefix: String, properties: DescriptorProperties): Unit + keyPrefix: String, + properties: DescriptorProperties): Unit } http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala index 73dd1f9..d7bf573 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala @@ -20,16 +20,22 @@ package org.apache.flink.table.descriptors /** * Validator for a [[HierarchyDescriptor]]. + * + * @param keyPrefix prefix to be added to every property before validation */ -trait HierarchyDescriptorValidator extends DescriptorValidator{ +abstract class HierarchyDescriptorValidator(keyPrefix: String) extends DescriptorValidator{ - def validate(properties: DescriptorProperties): Unit = { - validateWithPrefix("", properties) + final def validate(properties: DescriptorProperties): Unit = { + validateWithPrefix(keyPrefix, properties) } /** * Performs validation with a prefix for the keys. */ - def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit + protected def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit } + +object HierarchyDescriptorValidator { + val EMPTY_PREFIX = "" +} http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala new file mode 100644 index 0000000..ac86ba7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions + +/** + * Descriptor for a literal value. A literal value consists of a type and the actual value. + * Expression values are not allowed. + * + * If no type is set, the type is automatically derived from the value. Currently, + * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. + * + * Examples: + * - "true", "false" -> BOOLEAN + * - "42", "-5" -> INT + * - "2.0", "1234.222" -> DOUBLE + * - VARCHAR otherwise + */ +class LiteralValue extends HierarchyDescriptor { + + var typeInfo: Option[String] = None + var value: Option[Any] = None + + /** + * Type information of the literal value. E.g. Types.BOOLEAN. + * + * @param typeInfo type information describing the value + */ + def of(typeInfo: TypeInformation[_]): LiteralValue = { + Preconditions.checkNotNull("Type information must not be null.") + this.typeInfo = Option(TypeStringUtils.writeTypeInfo(typeInfo)) + this + } + + /** + * Type string of the literal value. E.g. "BOOLEAN". + * + * @param typeString type string describing the value + */ + def of(typeString: String): LiteralValue = { + this.typeInfo = Option(typeString) + this + } + + /** + * Literal BOOLEAN value. + * + * @param value literal BOOLEAN value + */ + def value(value: Boolean): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal INT value. + * + * @param value literal INT value + */ + def value(value: Int): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal DOUBLE value. + * + * @param value literal DOUBLE value + */ + def value(value: Double): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal FLOAT value. + * + * @param value literal FLOAT value + */ + def value(value: Float): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal value either for an explicit VARCHAR type or automatically derived type. + * + * If no type is set, the type is automatically derived from the value. Currently, + * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. + * + * @param value literal value + */ + def value(value: String): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal BIGINT value. + * + * @param value literal BIGINT value + */ + def value(value: Long): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal TINYINT value. + * + * @param value literal TINYINT value + */ + def value(value: Byte): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal SMALLINT value. + * + * @param value literal SMALLINT value + */ + def value(value: Short): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Literal DECIMAL value. + * + * @param value literal DECIMAL value + */ + def value(value: java.math.BigDecimal): LiteralValue = { + this.value = Option(value) + this + } + + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties) + } + + /** + * Internal method for properties conversion. + */ + override private[flink] def addPropertiesWithPrefix( + keyPrefix: String, + properties: DescriptorProperties) + : Unit = { + + typeInfo match { + // explicit type + case Some(ti) => + properties.putString(keyPrefix + "type", ti) + value.foreach(v => properties.putString(keyPrefix + "value", String.valueOf(v))) + // implicit type + case None => + // do not allow values in top-level + if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) { + throw new ValidationException( + "Literal values with implicit type must not exist in the top level of a hierarchy.") + } + value.foreach { v => + properties.putString(keyPrefix.substring(0, keyPrefix.length - 1), String.valueOf(v)) + } + } + } +} + +/** + * Descriptor for a literal value. A literal value consists of a type and the actual value. + * Expression values are not allowed. + * + * If no type is set, the type is automatically derived from the value. Currently, + * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. + * + * Examples: + * - "true", "false" -> BOOLEAN + * - "42", "-5" -> INT + * - "2.0", "1234.222" -> DOUBLE + * - VARCHAR otherwise + */ +object LiteralValue { + + /** + * Descriptor for a literal value. A literal value consists of a type and the actual value. + * Expression values are not allowed. + * + * If no type is set, the type is automatically derived from the value. Currently, + * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. + * + * Examples: + * - "true", "false" -> BOOLEAN + * - "42", "-5" -> INT + * - "2.0", "1234.222" -> DOUBLE + * - VARCHAR otherwise + */ + def apply() = new LiteralValue() +}