[FLINK-8863] [sql-client] Make user-defined functions more robust

- Simplify code and fix various bugs
- Add more tests
- Refactor various names for descriptors and variables
- Make 'from' property mandatory
- Make LiteralValue public API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abfdc1a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abfdc1a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abfdc1a2

Branch: refs/heads/master
Commit: abfdc1a2d649029d9f8a22c8d4880dc5068149c8
Parents: 8014dad
Author: Timo Walther <twal...@apache.org>
Authored: Mon Jul 9 23:07:38 2018 +0200
Committer: Timo Walther <twal...@apache.org>
Committed: Tue Jul 10 13:55:21 2018 +0200

----------------------------------------------------------------------
 .../conf/sql-client-defaults.yaml               |  18 +-
 .../flink/table/client/cli/CliClient.java       |  19 ++
 .../flink/table/client/cli/CliStrings.java      |   1 +
 .../table/client/cli/SqlCommandParser.java      |   1 +
 .../flink/table/client/config/Environment.java  |  36 ++-
 .../flink/table/client/config/Source.java       |   5 +-
 .../client/config/UserDefinedFunction.java      |  63 ++---
 .../flink/table/client/gateway/Executor.java    |   5 +
 .../client/gateway/local/ExecutionContext.java  |  23 +-
 .../client/gateway/local/LocalExecutor.java     |   8 +
 .../gateway/local/ExecutionContextTest.java     |  14 ++
 .../gateway/local/LocalExecutorITCase.java      | 130 +++-------
 .../gateway/utils/UserDefinedFunctions.java     |  16 +-
 .../resources/test-sql-client-defaults.yaml     |  25 ++
 .../src/test/resources/test-sql-client-udf.yaml |  82 -------
 .../flink/table/descriptors/ClassInstance.scala | 238 +++++++++++++++++++
 .../descriptors/ClassInstanceValidator.scala    |  59 +++++
 .../flink/table/descriptors/ClassType.scala     | 123 ----------
 .../table/descriptors/ClassTypeValidator.scala  |  75 ------
 .../flink/table/descriptors/Descriptor.scala    |   8 +-
 .../descriptors/DescriptorProperties.scala      | 114 +++++----
 .../table/descriptors/FunctionDescriptor.scala  |  38 +--
 .../FunctionDescriptorValidator.scala           |  57 +++++
 .../table/descriptors/FunctionValidator.scala   |  42 ----
 .../table/descriptors/HierarchyDescriptor.scala |   7 +-
 .../HierarchyDescriptorValidator.scala          |  14 +-
 .../flink/table/descriptors/LiteralValue.scala  | 221 +++++++++++++++++
 .../descriptors/LiteralValueValidator.scala     | 144 +++++++++++
 .../flink/table/descriptors/PrimitiveType.scala |  55 -----
 .../descriptors/PrimitiveTypeValidator.scala    | 146 ------------
 .../descriptors/service/FunctionService.scala   |  87 -------
 .../flink/table/functions/FunctionService.scala | 156 ++++++++++++
 .../table/descriptors/ClassInstanceTest.scala   | 102 ++++++++
 .../flink/table/descriptors/ClassTypeTest.scala |  77 ------
 .../flink/table/descriptors/CsvTest.scala       |   3 +
 .../descriptors/FunctionDescriptorTest.scala    |  59 +++++
 .../flink/table/descriptors/FunctionTest.scala  |  65 -----
 .../table/descriptors/LiteralValueTest.scala    | 125 ++++++++++
 .../table/descriptors/PrimitiveTypeTest.scala   | 117 ---------
 .../table/functions/FunctionServiceTest.scala   | 125 ++++++++++
 40 files changed, 1576 insertions(+), 1127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 7ca7776..51e6e95 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -21,11 +21,14 @@
 # Defaults might be overwritten by a session specific environment.
 
 
+# See the Table API & SQL documentation for details about supported properties.
+
+
 #==============================================================================
 # Table Sources
 #==============================================================================
 
-# Define table sources and sinks here. See the Table API & SQL documentation 
for details.
+# Define table sources and sinks here.
 
 tables: [] # empty list
 # A typical table source definition looks like:
@@ -36,6 +39,19 @@ tables: [] # empty list
 #   schema: ...
 
 #==============================================================================
+# User-defined functions
+#==============================================================================
+
+# Define scalar, aggregate, or table functions here.
+
+functions: [] # empty list
+# A typical function definition looks like:
+# - name: ...
+#   from: class
+#   class: ...
+#   constructor: ...
+
+#==============================================================================
 # Execution properties
 #==============================================================================
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 233e49b..7bf7bb7 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -201,6 +201,9 @@ public class CliClient {
                                case SHOW_TABLES:
                                        callShowTables(cmdCall);
                                        break;
+                               case SHOW_FUNCTIONS:
+                                       callShowFunctions(cmdCall);
+                                       break;
                                case DESCRIBE:
                                        callDescribe(cmdCall);
                                        break;
@@ -284,6 +287,22 @@ public class CliClient {
                terminal.flush();
        }
 
+       private void callShowFunctions(SqlCommandCall cmdCall) {
+               final List<String> functions;
+               try {
+                       functions = executor.listUserDefinedFunctions(context);
+               } catch (SqlExecutionException e) {
+                       printException(e);
+                       return;
+               }
+               if (functions.isEmpty()) {
+                       
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
+               } else {
+                       functions.forEach((v) -> terminal.writer().println(v));
+               }
+               terminal.flush();
+       }
+
        private void callDescribe(SqlCommandCall cmdCall) {
                final TableSchema schema;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index 1e8f696..b917317 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -45,6 +45,7 @@ public final class CliStrings {
                .append(formatCommand(SqlCommand.CLEAR, "Clears the current 
terminal."))
                .append(formatCommand(SqlCommand.HELP, "Prints the available 
commands."))
                .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all 
registered tables."))
+               .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all 
registered user-defined functions."))
                .append(formatCommand(SqlCommand.DESCRIBE, "Describes the 
schema of a table with the given name."))
                .append(formatCommand(SqlCommand.EXPLAIN, "Describes the 
execution plan of a query or table with the given name."))
                .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 214a17d..5543a3e 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -86,6 +86,7 @@ public final class SqlCommandParser {
                CLEAR("clear"),
                HELP("help"),
                SHOW_TABLES("show tables"),
+               SHOW_FUNCTIONS("show functions"),
                DESCRIBE("describe"),
                EXPLAIN("explain"),
                SELECT("select"),

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index c1db4c1..b26c45f 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -42,17 +42,17 @@ public class Environment {
 
        private Map<String, TableDescriptor> tables;
 
+       private Map<String, UserDefinedFunction> functions;
+
        private Execution execution;
 
        private Deployment deployment;
 
-       private Map<String, UserDefinedFunction> functions;
-
        public Environment() {
                this.tables = Collections.emptyMap();
+               this.functions = Collections.emptyMap();
                this.execution = new Execution();
                this.deployment = new Deployment();
-               this.functions = Collections.emptyMap();
        }
 
        public Map<String, TableDescriptor> getTables() {
@@ -69,7 +69,7 @@ public class Environment {
                                
config.remove(TableDescriptorValidator.TABLE_TYPE());
                                final Source s = Source.create(config);
                                if (this.tables.containsKey(s.getName())) {
-                                       throw new SqlClientException("Duplicate 
source name '" + s + "'.");
+                                       throw new SqlClientException("Duplicate 
source name '" + s.getName() + "'.");
                                }
                                this.tables.put(s.getName(), s);
                        } else {
@@ -79,11 +79,18 @@ public class Environment {
                });
        }
 
+       public Map<String, UserDefinedFunction> getFunctions() {
+               return functions;
+       }
+
        public void setFunctions(List<Map<String, Object>> functions) {
                this.functions = new HashMap<>(functions.size());
                functions.forEach(config -> {
                        final UserDefinedFunction f = 
UserDefinedFunction.create(config);
-                       this.functions.put(f.name(), f);
+                       if (this.tables.containsKey(f.getName())) {
+                               throw new SqlClientException("Duplicate 
function name '" + f.getName() + "'.");
+                       }
+                       this.functions.put(f.getName(), f);
                });
        }
 
@@ -103,10 +110,6 @@ public class Environment {
                return deployment;
        }
 
-       public Map<String, UserDefinedFunction> getFunctions() {
-               return functions;
-       }
-
        @Override
        public String toString() {
                final StringBuilder sb = new StringBuilder();
@@ -117,6 +120,13 @@ public class Environment {
                        table.addProperties(props);
                        props.asMap().forEach((k, v) -> sb.append("  
").append(k).append(": ").append(v).append('\n'));
                });
+               sb.append("=================== Functions 
====================\n");
+               functions.forEach((name, function) -> {
+                       sb.append("- name: ").append(name).append("\n");
+                       final DescriptorProperties props = new 
DescriptorProperties(true);
+                       function.addProperties(props);
+                       props.asMap().forEach((k, v) -> sb.append("  
").append(k).append(": ").append(v).append('\n'));
+               });
                sb.append("=================== Execution 
====================\n");
                execution.toProperties().forEach((k, v) -> 
sb.append(k).append(": ").append(v).append('\n'));
                sb.append("=================== Deployment 
===================\n");
@@ -153,7 +163,7 @@ public class Environment {
 
                // merge functions
                final Map<String, UserDefinedFunction> functions = new 
HashMap<>(env1.getFunctions());
-               mergedEnv.getFunctions().putAll(env2.getFunctions());
+               functions.putAll(env2.getFunctions());
                mergedEnv.functions = functions;
 
                // merge execution properties
@@ -165,12 +175,18 @@ public class Environment {
                return mergedEnv;
        }
 
+       /**
+        * Enriches an environment with new/modified properties and returns the 
new instance.
+        */
        public static Environment enrich(Environment env, Map<String, String> 
properties) {
                final Environment enrichedEnv = new Environment();
 
                // merge tables
                enrichedEnv.tables = new HashMap<>(env.getTables());
 
+               // merge functions
+               enrichedEnv.functions = new HashMap<>(env.getFunctions());
+
                // enrich execution properties
                enrichedEnv.execution = Execution.enrich(env.execution, 
properties);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
index 7b2498f..2bef257 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Configuration of a table source. Parses an entry in the `sources` list of 
an environment
+ * Configuration of a table source. Parses an entry in the `tables` list of an 
environment
  * file and translates to table descriptor properties.
  */
 public class Source extends TableSourceDescriptor {
@@ -49,6 +49,9 @@ public class Source extends TableSourceDescriptor {
                return properties;
        }
 
+       /**
+        * Creates a table source descriptor with the given config.
+        */
        public static Source create(Map<String, Object> config) {
                if (!config.containsKey(NAME)) {
                        throw new SqlClientException("The 'name' attribute of a 
table source is missing.");

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
index 235f4ea..8652d40 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
@@ -21,68 +21,47 @@ package org.apache.flink.table.client.config;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.FunctionDescriptor;
-import org.apache.flink.table.descriptors.FunctionValidator;
 
+import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS;
-
 /**
  * Descriptor for user-defined functions.
  */
 public class UserDefinedFunction extends FunctionDescriptor {
 
-       private static final String FROM = "from";
-
-       private From from;
-
+       private String name;
        private Map<String, String> properties;
 
-       private UserDefinedFunction(String name, From from, Map<String, String> 
properties) {
-               super(name);
-               this.from = from;
+       private static final String NAME = "name";
+
+       private UserDefinedFunction(String name, Map<String, String> 
properties) {
+               this.name = name;
                this.properties = properties;
        }
 
-       /**
-        * Gets where the user-defined function should be created from.
-        */
-       public From getFrom() {
-               return from;
+       public String getName() {
+               return name;
+       }
+
+       public Map<String, String> getProperties() {
+               return properties;
        }
 
        /**
-        * Creates a UDF descriptor with the given config.
+        * Creates a user-defined function descriptor with the given config.
         */
        public static UserDefinedFunction create(Map<String, Object> config) {
-               Map<String, String> udfConfig = 
ConfigUtil.normalizeYaml(config);
-               if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) {
+               if (!config.containsKey(NAME)) {
                        throw new SqlClientException("The 'name' attribute of a 
function is missing.");
                }
-
-               final String name = 
udfConfig.get(FunctionValidator.FUNCTION_NAME());
-               if (name.trim().length() <= 0) {
+               final Object name = config.get(NAME);
+               if (name == null || !(name instanceof String) || ((String) 
name).trim().length() <= 0) {
                        throw new SqlClientException("Invalid function name '" 
+ name + "'.");
                }
-
-               // the default value is "CLASS"
-               From fromValue = CLASS;
-
-               if (udfConfig.containsKey(FROM)) {
-                       final String from = udfConfig.get(FROM);
-                       try {
-                               fromValue = From.valueOf(from.toUpperCase());
-                       } catch (IllegalArgumentException ex) {
-                               throw new SqlClientException("Unknown 'from' 
value '" + from + "'.");
-                       }
-               }
-
-               switch (fromValue) {
-                       case CLASS:
-                               return new UserDefinedFunction(name, fromValue, 
udfConfig);
-                       default:
-                               throw new SqlClientException("The from 
attribute can only be \"class\" now.");
-               }
+               final Map<String, Object> properties = new HashMap<>(config);
+               properties.remove(NAME);
+               return new UserDefinedFunction((String) name, 
ConfigUtil.normalizeYaml(properties));
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -91,8 +70,4 @@ public class UserDefinedFunction extends FunctionDescriptor {
        public void addProperties(DescriptorProperties properties) {
                this.properties.forEach(properties::putString);
        }
-
-       enum From {
-               CLASS
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 74e6a6b..f12ff4d 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -46,6 +46,11 @@ public interface Executor {
        List<String> listTables(SessionContext session) throws 
SqlExecutionException;
 
        /**
+        * Lists all user-defined functions known to the executor.
+        */
+       List<String> listUserDefinedFunctions(SessionContext session) throws 
SqlExecutionException;
+
+       /**
         * Returns the schema of a table. Throws an exception if the table 
could not be found. The
         * schema might contain time attribute types for helping the user 
during debugging a query.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0cd9738..9cd3d8d 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -48,10 +48,9 @@ import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.TableSourceDescriptor;
-import org.apache.flink.table.descriptors.service.FunctionService;
 import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
@@ -105,20 +104,18 @@ public class ExecutionContext<T> {
                tableSources = new HashMap<>();
                mergedEnv.getTables().forEach((name, descriptor) -> {
                        if (descriptor instanceof TableSourceDescriptor) {
-                               TableSource<?> tableSource = 
TableSourceFactoryService.findAndCreateTableSource(
-                                               (TableSourceDescriptor) 
descriptor, classLoader);
+                               final TableSource<?> tableSource = 
TableSourceFactoryService.findAndCreateTableSource(
+                                       (TableSourceDescriptor) descriptor,
+                                       classLoader);
                                tableSources.put(name, tableSource);
                        }
                });
 
-               // generate user-defined functions
+               // create user-defined functions
                functions = new HashMap<>();
                mergedEnv.getFunctions().forEach((name, descriptor) -> {
-                       DescriptorProperties properties = new 
DescriptorProperties(true);
-                       descriptor.addProperties(properties);
-                       functions.put(
-                                       name,
-                                       
FunctionService.generateUserDefinedFunction(properties, classLoader));
+                       final UserDefinedFunction function = 
FunctionService.createFunction(descriptor, classLoader);
+                       functions.put(name, function);
                });
 
                // convert deployment options into command line options that 
describe a cluster
@@ -227,7 +224,7 @@ public class ExecutionContext<T> {
                        // register table sources
                        tableSources.forEach(tableEnv::registerTableSource);
 
-                       // register UDFs
+                       // register user-defined functions
                        if (tableEnv instanceof StreamTableEnvironment) {
                                StreamTableEnvironment streamTableEnvironment = 
(StreamTableEnvironment) tableEnv;
                                functions.forEach((k, v) -> {
@@ -237,6 +234,8 @@ public class ExecutionContext<T> {
                                                
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
                                        } else if (v instanceof TableFunction) {
                                                
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+                                       } else {
+                                               throw new 
SqlExecutionException("Unsupported function type: " + v.getClass().getName());
                                        }
                                });
                        } else {
@@ -248,6 +247,8 @@ public class ExecutionContext<T> {
                                                
batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
                                        } else if (v instanceof TableFunction) {
                                                
batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+                                       } else {
+                                               throw new 
SqlExecutionException("Unsupported function type: " + v.getClass().getName());
                                        }
                                });
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index bd463ac..d658ee9 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -191,6 +191,14 @@ public class LocalExecutor implements Executor {
        }
 
        @Override
+       public List<String> listUserDefinedFunctions(SessionContext session) 
throws SqlExecutionException {
+               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+                       .createEnvironmentInstance()
+                       .getTableEnvironment();
+               return Arrays.asList(tableEnv.listUserDefinedFunctions());
+       }
+
+       @Override
        public TableSchema getTableSchema(SessionContext session, String name) 
throws SqlExecutionException {
                final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
                        .createEnvironmentInstance()

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 8ba88ec..d1ea5a1 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
@@ -28,8 +29,10 @@ import 
org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.commons.cli.Options;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -46,6 +49,17 @@ public class ExecutionContextTest {
                assertEquals(99, config.getAutoWatermarkInterval());
        }
 
+       @Test
+       public void testFunctions() throws Exception {
+               final ExecutionContext<?> context = createExecutionContext();
+               final TableEnvironment tableEnv = 
context.createEnvironmentInstance().getTableEnvironment();
+               final String[] expected = new String[]{"scalarUDF", "tableUDF", 
"aggregateUDF"};
+               final String[] actual = tableEnv.listUserDefinedFunctions();
+               Arrays.sort(expected);
+               Arrays.sort(actual);
+               assertArrayEquals(expected, actual);
+       }
+
        private <T> ExecutionContext<T> createExecutionContext() throws 
Exception {
                final Environment env = EnvironmentFileUtil.parseModified(
                        DEFAULTS_ENVIRONMENT_FILE,

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index b32353f..8f62be4 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -66,7 +66,6 @@ import static org.junit.Assert.assertTrue;
 public class LocalExecutorITCase extends TestLogger {
 
        private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
-       private static final String UDF_ENVIRONMENT_FILE = 
"test-sql-client-udf.yaml";
 
        private static final int NUM_TMS = 2;
        private static final int NUM_SLOTS_PER_TM = 2;
@@ -107,6 +106,17 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        @Test
+       public void testListUserDefinedFunctions() throws Exception {
+               final Executor executor = createDefaultExecutor(clusterClient);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               final List<String> actualTables = 
executor.listUserDefinedFunctions(session);
+
+               final List<String> expectedTables = 
Arrays.asList("aggregateUDF", "tableUDF", "scalarUDF");
+               assertEquals(expectedTables, actualTables);
+       }
+
+       @Test
        public void testGetSessionProperties() throws Exception {
                final Executor executor = createDefaultExecutor(clusterClient);
                final SessionContext session = new 
SessionContext("test-session", new Environment());
@@ -150,68 +160,6 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        @Test(timeout = 30_000L)
-       public void testScalarUDF() throws Exception {
-               final Executor executor =
-                               createDefaultExecutor(UDF_ENVIRONMENT_FILE, 
clusterClient);
-               final SessionContext session = new 
SessionContext("test-scalarUDF", new Environment());
-               final ResultDescriptor rd =
-                               executor.executeQuery(session, "SELECT 
scalarUDF(10)");
-               final List<String> actualResults =
-                               retrieveChangelogResult(executor, session, 
rd.getResultId());
-               final List<String> expectedResults = new ArrayList<>();
-               expectedResults.add("(true,15)");
-               TestBaseUtils.compareResultCollections(
-                               expectedResults, actualResults, 
Comparator.naturalOrder());
-       }
-
-       @Test(timeout = 30_000L)
-       public void testAggregateUDF() throws Exception {
-               final Executor executor =
-                               createDefaultExecutor(UDF_ENVIRONMENT_FILE, 
clusterClient);
-               final SessionContext session = new 
SessionContext("test-aggregateUDF", new Environment());
-               final ResultDescriptor rd =
-                               executor.executeQuery(session, "SELECT 
aggregateUDF(cast(1 as BIGINT))");
-               final List<String> actualResults =
-                               retrieveChangelogResult(executor, session, 
rd.getResultId());
-               final List<String> expectedResults = new ArrayList<>();
-               expectedResults.add("(true,100)");
-               TestBaseUtils.compareResultCollections(
-                               expectedResults, actualResults, 
Comparator.naturalOrder());
-       }
-
-       @Test(timeout = 30_000L)
-       public void testTableUDF() throws Exception {
-               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
-               Objects.requireNonNull(url);
-               final Map<String, String> replaceVars = new HashMap<>();
-               replaceVars.put("$VAR_0", url.getPath());
-               final Executor executor =
-                               createModifiedExecutor(UDF_ENVIRONMENT_FILE, 
clusterClient, replaceVars);
-
-               final SessionContext session = new 
SessionContext("test-aggregateUDF", new Environment());
-               final ResultDescriptor rd =
-                               executor.executeQuery(
-                                               session,
-                                               "SELECT w, l from TableNumber1, 
LATERAL TABLE(tableUDF(StringField1)) as T(w, l)");
-               final List<String> actualResults =
-                               retrieveChangelogResult(executor, session, 
rd.getResultId());
-               final List<String> expectedResults = new ArrayList<>();
-               expectedResults.add("(true,Hello,10)");
-               expectedResults.add("(true,World,10)");
-               expectedResults.add("(true,Hello,10)");
-               expectedResults.add("(true,World,10)");
-               expectedResults.add("(true,Hello,10)");
-               expectedResults.add("(true,World,10)");
-               expectedResults.add("(true,Hello,10)");
-               expectedResults.add("(true,World,10)");
-               expectedResults.add("(true,Hello,10)");
-               expectedResults.add("(true,World,10)");
-               expectedResults.add("(true,Hello,10)");
-               expectedResults.add("(true,World!!!!,14)");
-               TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
-       }
-
-       @Test(timeout = 30_000L)
        public void testStreamQueryExecutionChangelog() throws Exception {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
@@ -226,7 +174,9 @@ public class LocalExecutorITCase extends TestLogger {
 
                try {
                        // start job and retrieval
-                       final ResultDescriptor desc = 
executor.executeQuery(session, "SELECT * FROM TableNumber1");
+                       final ResultDescriptor desc = executor.executeQuery(
+                               session,
+                               "SELECT scalarUDF(IntegerField1), StringField1 
FROM TableNumber1");
 
                        assertFalse(desc.isMaterialized());
 
@@ -234,12 +184,12 @@ public class LocalExecutorITCase extends TestLogger {
                                        retrieveChangelogResult(executor, 
session, desc.getResultId());
 
                        final List<String> expectedResults = new ArrayList<>();
-                       expectedResults.add("(true,42,Hello World)");
-                       expectedResults.add("(true,22,Hello World)");
-                       expectedResults.add("(true,32,Hello World)");
-                       expectedResults.add("(true,32,Hello World)");
-                       expectedResults.add("(true,42,Hello World)");
-                       expectedResults.add("(true,52,Hello World!!!!)");
+                       expectedResults.add("(true,47,Hello World)");
+                       expectedResults.add("(true,27,Hello World)");
+                       expectedResults.add("(true,37,Hello World)");
+                       expectedResults.add("(true,37,Hello World)");
+                       expectedResults.add("(true,47,Hello World)");
+                       expectedResults.add("(true,57,Hello World!!!!)");
 
                        TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
                } finally {
@@ -262,19 +212,21 @@ public class LocalExecutorITCase extends TestLogger {
 
                try {
                        // start job and retrieval
-                       final ResultDescriptor desc = 
executor.executeQuery(session, "SELECT IntegerField1 FROM TableNumber1");
+                       final ResultDescriptor desc = executor.executeQuery(
+                               session,
+                               "SELECT scalarUDF(IntegerField1), StringField1 
FROM TableNumber1");
 
                        assertTrue(desc.isMaterialized());
 
                        final List<String> actualResults = 
retrieveTableResult(executor, session, desc.getResultId());
 
                        final List<String> expectedResults = new ArrayList<>();
-                       expectedResults.add("42");
-                       expectedResults.add("22");
-                       expectedResults.add("32");
-                       expectedResults.add("32");
-                       expectedResults.add("42");
-                       expectedResults.add("52");
+                       expectedResults.add("47,Hello World");
+                       expectedResults.add("27,Hello World");
+                       expectedResults.add("37,Hello World");
+                       expectedResults.add("37,Hello World");
+                       expectedResults.add("47,Hello World");
+                       expectedResults.add("57,Hello World!!!!");
 
                        TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
                } finally {
@@ -317,32 +269,19 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> 
clusterClient) throws Exception {
-               return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, 
clusterClient);
-       }
-
-       private <T> LocalExecutor createDefaultExecutor(
-                       String configFileName, ClusterClient<T>
-                       clusterClient) throws Exception {
                return new LocalExecutor(
-                       EnvironmentFileUtil.parseModified(configFileName, 
Collections.singletonMap("$VAR_2", "batch")),
+                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, 
Collections.singletonMap("$VAR_2", "batch")),
                        Collections.emptyList(),
                        clusterClient.getFlinkConfiguration(),
                        new DummyCustomCommandLine<T>(clusterClient));
        }
 
        private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> 
clusterClient, Map<String, String> replaceVars) throws Exception {
-               return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, 
clusterClient, replaceVars);
-       }
-
-       private <T> LocalExecutor createModifiedExecutor(
-                       String configFileName,
-                       ClusterClient<T> clusterClient,
-                       Map<String, String>     replaceVars) throws Exception {
                return new LocalExecutor(
-                               
EnvironmentFileUtil.parseModified(configFileName, replaceVars),
-                               Collections.emptyList(),
-                               clusterClient.getFlinkConfiguration(),
-                               new DummyCustomCommandLine<T>(clusterClient));
+                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
+                       Collections.emptyList(),
+                       clusterClient.getFlinkConfiguration(),
+                       new DummyCustomCommandLine<T>(clusterClient));
        }
 
        private List<String> retrieveTableResult(
@@ -373,6 +312,7 @@ public class LocalExecutorITCase extends TestLogger {
                        Executor executor,
                        SessionContext session,
                        String resultID) throws InterruptedException {
+
                final List<String> actualResults = new ArrayList<>();
                while (true) {
                        Thread.sleep(50); // slow the processing down

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
index b93ef66..9629c7f 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
@@ -27,12 +27,12 @@ import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.types.Row;
 
 /**
- * A bunch of UDFs for SQL-Client test.
+ * A bunch of UDFs for testing the SQL Client.
  */
 public class UserDefinedFunctions {
 
        /**
-        * The scalar function for SQL-Client test.
+        * The scalar function for SQL Client test.
         */
        public static class ScalarUDF extends ScalarFunction {
 
@@ -48,12 +48,12 @@ public class UserDefinedFunctions {
        }
 
        /**
-        * The aggregate function for SQL-Client test.
+        * The aggregate function for SQL Client test.
         */
        public static class AggregateUDF extends AggregateFunction<Long, Long> {
 
                public AggregateUDF(String name, Boolean flag, Integer value) {
-
+                       // do nothing
                }
 
                @Override
@@ -67,7 +67,7 @@ public class UserDefinedFunctions {
                }
 
                public void accumulate(Long acc, Long value) {
-
+                       // do nothing
                }
 
                @Override
@@ -77,10 +77,10 @@ public class UserDefinedFunctions {
        }
 
        /**
-        * The table function for SQL-Client test.
+        * The table function for SQL Client test.
         */
        public static class TableUDF extends TableFunction<Row> {
-               private long extra = 2L;
+               private long extra;
 
                public TableUDF(Long extra) {
                        this.extra = extra;
@@ -100,6 +100,4 @@ public class UserDefinedFunctions {
                        return Types.ROW(Types.STRING(), Types.LONG());
                }
        }
-
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index a9b4161..0d6778a 100644
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -63,6 +63,31 @@ tables:
       line-delimiter: "\n"
       comment-prefix: "#"
 
+functions:
+  - name: scalarUDF
+    from: class
+    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
+    constructor:
+      - 5
+  - name: aggregateUDF
+    from: class
+    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
+    constructor:
+      - StarryName
+      - false
+      - class: java.lang.Integer
+        constructor:
+          - class: java.lang.String
+            constructor:
+              - type: VARCHAR
+                value: 3
+  - name: tableUDF
+    from: class
+    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
+    constructor:
+      - type: LONG
+        value: 5
+
 execution:
   type: "$VAR_2"
   time-characteristic: event-time

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
deleted file mode 100644
index 3e16030..0000000
--- 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
+++ /dev/null
@@ -1,82 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-#==============================================================================
-# TEST ENVIRONMENT FILE
-# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase.
-#==============================================================================
-
-# this file has variables that can be filled with content by replacing $VAR_XXX
-
-tables:
-  - name: TableNumber1
-    type: source
-    schema:
-      - name: IntegerField1
-        type: INT
-      - name: StringField1
-        type: VARCHAR
-    connector:
-      type: filesystem
-      path: "$VAR_0"
-    format:
-      type: csv
-      fields:
-        - name: IntegerField1
-          type: INT
-        - name: StringField1
-          type: VARCHAR
-      line-delimiter: "\n"
-      comment-prefix: "#"
-
-functions:
-  - name: scalarUDF
-    from: class
-    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
-    constructor:
-      - 5
-
-  - name: aggregateUDF
-    from: class
-    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
-    constructor:
-      - StarryName
-      - false
-      - class: java.lang.Integer
-        constructor:
-          - class: java.lang.String
-            constructor:
-              - type: VARCHAR
-                value: 3
-
-  - name: tableUDF
-    from: class
-    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
-    constructor:
-      - type: LONG
-        value: 5
-
-execution:
-  type: streaming
-  parallelism: 1
-  result-mode: changelog
-
-deployment:
-  response-timeout: 5000
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
new file mode 100644
index 0000000..007f2ba
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
+  * with a public constructor (with or without parameters).
+  */
+class ClassInstance extends HierarchyDescriptor {
+
+  private var className: Option[String] = None
+
+  // the parameter is either a literal value or the instance of a class
+  private val constructor: ArrayBuffer[Either[LiteralValue, ClassInstance]] = 
ArrayBuffer()
+
+  /**
+    * Sets the fully qualified class name for creating an instance.
+    *
+    * E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
+    *
+    * @param className fully qualified class name
+    */
+  def of(className: String): ClassInstance = {
+    this.className = Option(className)
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of literal type. The type is 
automatically derived from
+    * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and 
VARCHAR. Expression
+    * values are not allowed.
+    *
+    * Examples:
+    *   - "true", "false" -> BOOLEAN
+    *   - "42", "-5" -> INT
+    *   - "2.0", "1234.222" -> DOUBLE
+    *   - VARCHAR otherwise
+    *
+    * For other types and explicit type declaration use [[parameter(String, 
String)]] or
+    * [[parameter(TypeInformation, String)]].
+    *
+    */
+  def parameterString(valueString: String): ClassInstance = {
+    constructor += Left(new LiteralValue().value(valueString))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of literal type. The type is 
explicitly defined using a
+    * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value 
is parsed
+    * accordingly. Expression values are not allowed.
+    *
+    * @param typeString the type string that define how to parse the given 
value string
+    * @param valueString the literal value to be parsed
+    */
+  def parameter(typeString: String, valueString: String): ClassInstance = {
+    constructor += Left(new LiteralValue().of(typeString).value(valueString))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of literal type. The type is 
explicitly defined using
+    * type information. The value is parsed accordingly. Expression values are 
not allowed.
+    *
+    * @param typeInfo the type that define how to parse the given value string
+    * @param valueString the literal value to be parsed
+    */
+  def parameter(typeInfo: TypeInformation[_], valueString: String): 
ClassInstance = {
+    constructor += Left(new LiteralValue().of(typeInfo).value(valueString))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of BOOLEAN type.
+    *
+    * @param value BOOLEAN value
+    */
+  def parameter(value: Boolean): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.BOOLEAN).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of DOUBLE type.
+    *
+    * @param value DOUBLE value
+    */
+  def parameter(value: Double): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.DOUBLE).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of FLOAT type.
+    *
+    * @param value FLOAT value
+    */
+  def parameter(value: Float): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.FLOAT).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of INT type.
+    *
+    * @param value INT value
+    */
+  def parameter(value: Int): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.INT).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of VARCHAR type.
+    *
+    * @param value VARCHAR value
+    */
+  def parameter(value: String): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.STRING).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of BIGINT type.
+    *
+    * @param value BIGINT value
+    */
+  def parameter(value: Long): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.LONG).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of TINYINT type.
+    *
+    * @param value TINYINT value
+    */
+  def parameter(value: Byte): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.BYTE).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of SMALLINT type.
+    *
+    * @param value SMALLINT value
+    */
+  def parameter(value: Short): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.SHORT).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of DECIMAL type.
+    *
+    * @param value DECIMAL value
+    */
+  def parameter(value: java.math.BigDecimal): ClassInstance = {
+    constructor += Left(new LiteralValue().of(Types.DECIMAL).value(value))
+    this
+  }
+
+  /**
+    * Adds a constructor parameter value of a class instance (i.e. a Java 
object with a public
+    * constructor).
+    *
+    * @param classInstance description of a class instance (i.e. a Java object 
with a public
+    *                      constructor).
+    */
+  def parameter(classInstance: ClassInstance): ClassInstance = {
+    constructor += Right(classInstance)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit =  {
+    addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, 
properties)
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addPropertiesWithPrefix(
+      keyPrefix: String,
+      properties: DescriptorProperties): Unit = {
+    
className.foreach(properties.putString(s"$keyPrefix${ClassInstanceValidator.CLASS}",
 _))
+    var i = 0
+    while (i < constructor.size) {
+      constructor(i) match {
+        case Left(literalValue) =>
+          literalValue.addPropertiesWithPrefix(
+            s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
+            properties)
+        case Right(classInstance) =>
+          classInstance.addPropertiesWithPrefix(
+            s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
+            properties)
+      }
+      i += 1
+    }
+  }
+}
+
+/**
+  * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
+  * with a public constructor (with or without parameters).
+  */
+object ClassInstance {
+
+  /**
+    * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
+    * with a public constructor (with or without parameters).
+    */
+  def apply() = new ClassInstance
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
new file mode 100644
index 0000000..d9c3a02
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.HierarchyDescriptorValidator.EMPTY_PREFIX
+
+import scala.collection.JavaConversions._
+
+/**
+  * Validator for [[ClassInstance]].
+  */
+class ClassInstanceValidator(keyPrefix: String = EMPTY_PREFIX)
+  extends HierarchyDescriptorValidator(keyPrefix) {
+
+  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
+    // check class name
+    properties.validateString(s"$keyPrefix${ClassInstanceValidator.CLASS}", 
isOptional = false, 1)
+
+    // check constructor
+    val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
+
+    val constructorProperties = 
properties.getVariableIndexedProperties(constructorPrefix, List())
+    var i = 0
+    while (i < constructorProperties.size()) {
+      // nested class instance
+      if (constructorProperties(i).containsKey(ClassInstanceValidator.CLASS)) {
+        val classInstanceValidator = new 
ClassInstanceValidator(s"$constructorPrefix.$i.")
+        classInstanceValidator.validate(properties)
+      }
+      // literal value
+      else {
+        val primitiveValueValidator = new 
LiteralValueValidator(s"$constructorPrefix.$i.")
+        primitiveValueValidator.validate(properties)
+      }
+      i += 1
+    }
+  }
+}
+
+object ClassInstanceValidator {
+  val CLASS = "class"
+  val CONSTRUCTOR = "constructor"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
deleted file mode 100644
index 78ceb3a..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.typeutils.TypeStringUtils
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
-  * Descriptor for a class type.
-  *
-  * @param className the full name of the class (e.g., java.lang.Integer)
-  */
-class ClassType(var className: Option[String] = None)
-  extends HierarchyDescriptor {
-
-  // the parameter is either a primitive type or a class type
-  private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] =
-    ArrayBuffer()
-
-  /**
-    * Sets the class name for the descriptor.
-    */
-  def of(name: String): ClassType = {
-    this.className = Option(name)
-    this
-  }
-
-  /**
-    * Adds the given string formatted value as a parameter, the type of which 
will be automatically
-    * derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and 
"abc" -> String).
-    *
-    */
-  def strParam(valueStr: String): ClassType = {
-    val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr)
-    param(typeString, valueStr)
-  }
-
-  /**
-    * Adds the given string formatted value as a parameter, the type of which 
will be decided by the
-    * given type string (e.g., "DOUBLE", "VARCHAR").
-    */
-  def param(typeStr: String, valueStr: String): ClassType = {
-    param(TypeStringUtils.readTypeInfo(typeStr), valueStr)
-  }
-
-  /**
-    * Adds the give string formatted value as a parameter, the type of which 
will be defined by the
-    * given type information.
-    */
-  def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = {
-    constructor += Left(
-      new PrimitiveType[T]()
-        .of(typeInfo)
-        .value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, 
valueStr)))
-    this
-  }
-
-  /**
-    * Adds the give value as a parameter, the type of which will be 
automatically derived.
-    */
-  def param[T](value: T): ClassType = {
-    constructor += Left(
-      new PrimitiveType[T]()
-        .of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]]))
-        .value(value))
-    this
-  }
-
-  /**
-    * Adds a parameter defined by the given class type descriptor.
-    */
-  def param(field: ClassType): ClassType = {
-    constructor += Right(field)
-    this
-  }
-
-  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit =  {
-    addPropertiesWithPrefix("", properties)
-  }
-
-  override private[flink] def addPropertiesWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties): Unit = {
-    
className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}",
 _))
-    var i = 0
-    while (i < constructor.size) {
-      constructor(i) match {
-        case Left(basicType) =>
-          basicType.addPropertiesWithPrefix(
-            s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
-            properties)
-        case Right(classType) =>
-          classType.addPropertiesWithPrefix(
-            s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
-            properties)
-      }
-      i += 1
-    }
-  }
-}
-
-object ClassType {
-  def apply() = new ClassType
-  def apply(className: String) = new ClassType(Option(className))
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
deleted file mode 100644
index ebb48b5..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.ValidationException
-
-import scala.collection.JavaConversions._
-
-/**
-  * Validator for [[ClassType]].
-  */
-class ClassTypeValidator extends HierarchyDescriptorValidator {
-  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
-
-    properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", 
isOptional = false)
-
-    val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
-    normalizeConstructorParams(constructorPrefix, properties)
-
-    val constructorProps =
-      properties.getVariableIndexedProperties(constructorPrefix, List())
-    var i = 0
-    val primitiveTypeValidator = new PrimitiveTypeValidator
-    while (i < constructorProps.size()) {
-      if 
(constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
-        primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", 
properties)
-      } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
-        validateWithPrefix(s"$constructorPrefix.$i.", properties)
-      } else {
-        throw ValidationException("A constructor field must contain a 'class' 
or a 'type' key.")
-      }
-      i += 1
-    }
-  }
-
-  /**
-    * For each constructor parameter (e.g., constructor.0 = abc), we derive 
its type and replace it
-    * with the normalized form (e.g., constructor.0.type = VARCHAR, 
constructor.0.value = abc);
-    *
-    * @param constructorPrefix the prefix to get the constructor parameters
-    * @param properties the descriptor properties
-    */
-  def normalizeConstructorParams(
-    constructorPrefix: String,
-    properties: DescriptorProperties): Unit = {
-    val constructorValues = properties.getListProperties(constructorPrefix)
-    constructorValues.foreach(kv => {
-      properties.unsafeRemove(kv._1)
-      val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2)
-      
properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp)
-      
properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", 
kv._2)
-    })
-  }
-}
-
-object ClassTypeValidator {
-  val CLASS = "class"
-  val CONSTRUCTOR = "constructor"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
index ad97ded..e21527b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
@@ -19,8 +19,12 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * A class that adds a set of string-based, normalized properties for 
describing a
-  * table source or table sink.
+  * A class that adds a set of string-based, normalized properties for 
describing DDL information.
+  *
+  * Typical characteristics of a descriptor are:
+  * - descriptors have a default constructor and a default 'apply()' method 
for Scala
+  * - descriptors themselves contain very little logic
+  * - corresponding validators validate the correctness (goal: have a single 
point of validation)
   */
 abstract class Descriptor {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 8d410d0..4f5bd81 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -249,7 +249,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   /**
     * Returns a big decimal value under the given existing key.
     */
-  def getBigDecimal(key: String): BigDecimal = {
+  def getBigDecimal(key: String): JBigDecimal = {
     getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key))
   }
 
@@ -292,7 +292,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
-    * Returns a double value under the given key if it exists.
+    * Returns a double value under the given existing key.
     */
   def getDouble(key: String): Double = {
     getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
@@ -307,7 +307,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
-    * Returns a float value under the given key if it exists.
+    * Returns a float value under the given given existing key.
     */
   def getFloat(key: String): Float = {
     getOptionalFloat(key).orElseThrow(exceptionSupplier(key))
@@ -489,13 +489,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) 
{
 
     // filter for index
     val escapedKey = Pattern.quote(key)
-    val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)")
+    val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)(\\.)?(.*)")
 
     // extract index and property keys
     val indexes = properties.keys.flatMap { k =>
       val matcher = pattern.matcher(k)
       if (matcher.find()) {
-        Some((JInt.parseInt(matcher.group(1)), matcher.group(2)))
+        Some((JInt.parseInt(matcher.group(1)), matcher.group(3)))
       } else {
         None
       }
@@ -531,16 +531,6 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
-    * Returns all properties under a group of array formed keys.
-    *
-    * E.g. constructor -> returns all constructor.# properties.
-    */
-  def getListProperties(key: String): JMap[String, String] = {
-    val escapedKey = Pattern.quote(key)
-    properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava
-  }
-
-  /**
     * Returns all properties under a given key that contains an index in 
between.
     *
     * E.g. rowtime.0.name -> returns all rowtime.#.name properties
@@ -744,7 +734,8 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     */
   def validateBigDecimal(
       key: String,
-      isOptional: Boolean): Unit = {
+      isOptional: Boolean)
+    : Unit = {
 
     if (!properties.contains(key)) {
       if (!isOptional) {
@@ -767,14 +758,14 @@ class DescriptorProperties(normalizeKeys: Boolean = true) 
{
   def validateBigDecimal(
       key: String,
       isOptional: Boolean,
-      min: BigDecimal, // inclusive
-      max: BigDecimal) // inclusive
-  : Unit = {
+      min: JBigDecimal, // inclusive
+      max: JBigDecimal) // inclusive
+    : Unit = {
     validateComparable(
       key,
       isOptional,
-      min.bigDecimal,
-      max.bigDecimal,
+      min,
+      max,
       (value: String) => new JBigDecimal(value))
   }
 
@@ -783,7 +774,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     */
   def validateByte(
       key: String,
-      isOptional: Boolean): Unit = validateDouble(key, isOptional, 
Byte.MinValue, Byte.MaxValue)
+      isOptional: Boolean): Unit = validateByte(key, isOptional, 
Byte.MinValue, Byte.MaxValue)
 
   /**
     * Validates a byte property. The boundaries are inclusive.
@@ -792,7 +783,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       key: String,
       isOptional: Boolean,
       min: Byte) // inclusive
-  : Unit = {
+    : Unit = {
     validateByte(key, isOptional, min, Byte.MaxValue)
   }
 
@@ -804,7 +795,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       isOptional: Boolean,
       min: Byte, // inclusive
       max: Byte) // inclusive
-  : Unit = {
+    : Unit = {
     validateComparable(key, isOptional, new JByte(min), new JByte(max), 
JByte.valueOf)
   }
 
@@ -822,7 +813,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       key: String,
       isOptional: Boolean,
       min: Float) // inclusive
-  : Unit = {
+    : Unit = {
     validateFloat(key, isOptional, min, Float.MaxValue)
   }
 
@@ -834,7 +825,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       isOptional: Boolean,
       min: Float, // inclusive
       max: Float) // inclusive
-  : Unit = {
+    : Unit = {
     validateComparable(key, isOptional, new JFloat(min), new JFloat(max), 
JFloat.valueOf)
   }
 
@@ -848,58 +839,27 @@ class DescriptorProperties(normalizeKeys: Boolean = true) 
{
   /**
     * Validates a short property. The boundaries are inclusive.
     */
-  def validateFloat(
+  def validateShort(
       key: String,
       isOptional: Boolean,
       min: Short) // inclusive
-  : Unit = {
+    : Unit = {
     validateShort(key, isOptional, min, Short.MaxValue)
   }
 
   /**
-    * Validates a float property. The boundaries are inclusive.
+    * Validates a short property. The boundaries are inclusive.
     */
   def validateShort(
       key: String,
       isOptional: Boolean,
       min: Short, // inclusive
       max: Short) // inclusive
-  : Unit = {
+    : Unit = {
     validateComparable(key, isOptional, new JShort(min), new JShort(max), 
JShort.valueOf)
   }
 
   /**
-    * Validates a property by first parsing the string value to a comparable 
object.
-    * The boundaries are inclusive.
-    */
-  private def validateComparable[T <: Comparable[T]](
-      key: String,
-      isOptional: Boolean,
-      min: T,
-      max: T,
-      parseFunction: String => T)
-  : Unit = {
-    if (!properties.contains(key)) {
-      if (!isOptional) {
-        throw new ValidationException(s"Could not find required property 
'$key'.")
-      }
-    } else {
-      try {
-        val value = parseFunction(properties(key))
-
-        if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
-          throw new ValidationException(s"Property '$key' must be a 
${min.getClass.getSimpleName}" +
-            s" value between $min and $max but was: ${properties(key)}")
-        }
-      } catch {
-        case _: NumberFormatException =>
-          throw new ValidationException(
-            s"Property '$key' must be a byte value but was: 
${properties(key)}")
-      }
-    }
-  }
-
-  /**
     * Validation for variable indexed properties.
     *
     * For example:
@@ -1211,6 +1171,38 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
       }
     }
   }
+
+  /**
+    * Validates a property by first parsing the string value to a comparable 
object.
+    * The boundaries are inclusive.
+    */
+  private def validateComparable[T <: Comparable[T]](
+      key: String,
+      isOptional: Boolean,
+      min: T,
+      max: T,
+      parseFunction: String => T)
+    : Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      val typeName = min.getClass.getSimpleName
+      try {
+        val value = parseFunction(properties(key))
+
+        if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
+          throw new ValidationException(s"Property '$key' must be a $typeName" 
+
+            s" value between $min and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be a $typeName value but was: 
${properties(key)}")
+      }
+    }
+  }
 }
 
 object DescriptorProperties {

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
index f4c8363..13d5056 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
@@ -19,34 +19,38 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * Descriptor for describing a function that can be instantiated from 
somewhere (e.g., a class).
-  *
-  * @param name name of the function
+  * Descriptor for describing a function.
   */
-class FunctionDescriptor(var name: String) extends Descriptor {
+class FunctionDescriptor extends Descriptor {
 
-  var classDescriptor: Option[ClassType] = None
+  private var from: Option[String] = None
+  private var classInstance: Option[ClassInstance] = None
 
   /**
-    * Uses the class provided by the descriptor to instantiate the function.
+    * Creates a function from a class description.
     */
-  def using(classDescriptor: ClassType): FunctionDescriptor = {
-    this.classDescriptor = Option(classDescriptor)
+  def fromClass(classType: ClassInstance): FunctionDescriptor = {
+    from = Some(FunctionDescriptorValidator.FROM_VALUE_CLASS)
+    this.classInstance = Option(classType)
     this
   }
 
-  def getDescriptorProperties: DescriptorProperties = {
-    val descriptorProperties = new DescriptorProperties()
-    addProperties(descriptorProperties)
-    descriptorProperties
-  }
-
+  /**
+    * Internal method for format properties conversion.
+    */
   override def addProperties(properties: DescriptorProperties): Unit = {
-    properties.putString(FunctionValidator.FUNCTION_NAME, name)
-    classDescriptor.foreach(_.addProperties(properties))
+    from.foreach(properties.putString(FunctionDescriptorValidator.FROM, _))
+    classInstance.foreach(_.addProperties(properties))
   }
 }
 
+/**
+  * Descriptor for describing a function.
+  */
 object FunctionDescriptor {
-  def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name)
+
+  /**
+    * Descriptor for describing a function.
+    */
+  def apply(): FunctionDescriptor = new FunctionDescriptor()
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
new file mode 100644
index 0000000..c119777
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.DescriptorProperties.toJava
+import org.apache.flink.table.descriptors.FunctionDescriptorValidator.FROM
+
+import scala.collection.JavaConverters._
+
+/**
+  * Validator for [[FunctionDescriptor]].
+  */
+class FunctionDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+
+    val classValidation = (_: String) => {
+      new ClassInstanceValidator().validate(properties)
+    }
+
+    // check for 'from'
+    if (properties.containsKey(FROM)) {
+      properties.validateEnum(
+        FROM,
+        isOptional = false,
+        Map(
+          FunctionDescriptorValidator.FROM_VALUE_CLASS -> 
toJava(classValidation)
+        ).asJava
+      )
+    } else {
+      throw new ValidationException("Could not find 'from' property for 
function.")
+    }
+  }
+}
+
+object FunctionDescriptorValidator {
+
+  val FROM = "from"
+  val FROM_VALUE_CLASS = "class"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
deleted file mode 100644
index f36b9c2..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.functions.UserDefinedFunction
-
-import scala.collection.JavaConverters._
-
-/**
-  * Validator for [[FunctionDescriptor]].
-  */
-class FunctionValidator extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = 
false, 1)
-    new ClassTypeValidator().validate(properties)
-  }
-}
-
-object FunctionValidator {
-
-  val FUNCTION_NAME = "name"
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
index f97c807..b958b1c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
@@ -25,10 +25,11 @@ package org.apache.flink.table.descriptors
 abstract class HierarchyDescriptor extends Descriptor {
 
   /**
-    * Internal method for properties conversion. All the property keys will be 
prefixed according
-    * to the level.
+    * Internal method for properties conversion. All the property keys will be 
prefixed with the
+    * given key prefix.
     */
   private[flink] def addPropertiesWithPrefix(
-    keyPrefix: String, properties: DescriptorProperties): Unit
+    keyPrefix: String,
+    properties: DescriptorProperties): Unit
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
index 73dd1f9..d7bf573 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
@@ -20,16 +20,22 @@ package org.apache.flink.table.descriptors
 
 /**
   * Validator for a [[HierarchyDescriptor]].
+  *
+  * @param keyPrefix prefix to be added to every property before validation
   */
-trait HierarchyDescriptorValidator extends DescriptorValidator{
+abstract class HierarchyDescriptorValidator(keyPrefix: String) extends 
DescriptorValidator{
 
-  def validate(properties: DescriptorProperties): Unit = {
-    validateWithPrefix("", properties)
+  final def validate(properties: DescriptorProperties): Unit = {
+    validateWithPrefix(keyPrefix, properties)
   }
 
   /**
     * Performs validation with a prefix for the keys.
     */
-  def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): 
Unit
+  protected def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit
 
 }
+
+object HierarchyDescriptorValidator {
+  val EMPTY_PREFIX = ""
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abfdc1a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
new file mode 100644
index 0000000..ac86ba7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions
+
+/**
+  * Descriptor for a literal value. A literal value consists of a type and the 
actual value.
+  * Expression values are not allowed.
+  *
+  * If no type is set, the type is automatically derived from the value. 
Currently,
+  * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+  *
+  * Examples:
+  *   - "true", "false" -> BOOLEAN
+  *   - "42", "-5" -> INT
+  *   - "2.0", "1234.222" -> DOUBLE
+  *   - VARCHAR otherwise
+  */
+class LiteralValue extends HierarchyDescriptor {
+
+  var typeInfo: Option[String] = None
+  var value: Option[Any] = None
+
+  /**
+    * Type information of the literal value. E.g. Types.BOOLEAN.
+    *
+    * @param typeInfo type information describing the value
+    */
+  def of(typeInfo: TypeInformation[_]): LiteralValue = {
+    Preconditions.checkNotNull("Type information must not be null.")
+    this.typeInfo = Option(TypeStringUtils.writeTypeInfo(typeInfo))
+    this
+  }
+
+  /**
+    * Type string of the literal value. E.g. "BOOLEAN".
+    *
+    * @param typeString type string describing the value
+    */
+  def of(typeString: String): LiteralValue = {
+    this.typeInfo = Option(typeString)
+    this
+  }
+
+  /**
+    * Literal BOOLEAN value.
+    *
+    * @param value literal BOOLEAN value
+    */
+  def value(value: Boolean): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal INT value.
+    *
+    * @param value literal INT value
+    */
+  def value(value: Int): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal DOUBLE value.
+    *
+    * @param value literal DOUBLE value
+    */
+  def value(value: Double): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal FLOAT value.
+    *
+    * @param value literal FLOAT value
+    */
+  def value(value: Float): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal value either for an explicit VARCHAR type or automatically 
derived type.
+    *
+    * If no type is set, the type is automatically derived from the value. 
Currently,
+    * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+    *
+    * @param value literal value
+    */
+  def value(value: String): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal BIGINT value.
+    *
+    * @param value literal BIGINT value
+    */
+  def value(value: Long): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal TINYINT value.
+    *
+    * @param value literal TINYINT value
+    */
+  def value(value: Byte): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal SMALLINT value.
+    *
+    * @param value literal SMALLINT value
+    */
+  def value(value: Short): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Literal DECIMAL value.
+    *
+    * @param value literal DECIMAL value
+    */
+  def value(value: java.math.BigDecimal): LiteralValue = {
+    this.value = Option(value)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, 
properties)
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addPropertiesWithPrefix(
+      keyPrefix: String,
+      properties: DescriptorProperties)
+    : Unit = {
+
+    typeInfo match {
+      // explicit type
+      case Some(ti) =>
+        properties.putString(keyPrefix + "type", ti)
+        value.foreach(v => properties.putString(keyPrefix + "value", 
String.valueOf(v)))
+      // implicit type
+      case None =>
+        // do not allow values in top-level
+        if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
+          throw new ValidationException(
+            "Literal values with implicit type must not exist in the top level 
of a hierarchy.")
+        }
+        value.foreach { v =>
+          properties.putString(keyPrefix.substring(0, keyPrefix.length - 1), 
String.valueOf(v))
+        }
+    }
+  }
+}
+
+/**
+  * Descriptor for a literal value. A literal value consists of a type and the 
actual value.
+  * Expression values are not allowed.
+  *
+  * If no type is set, the type is automatically derived from the value. 
Currently,
+  * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+  *
+  * Examples:
+  *   - "true", "false" -> BOOLEAN
+  *   - "42", "-5" -> INT
+  *   - "2.0", "1234.222" -> DOUBLE
+  *   - VARCHAR otherwise
+  */
+object LiteralValue {
+
+  /**
+    * Descriptor for a literal value. A literal value consists of a type and 
the actual value.
+    * Expression values are not allowed.
+    *
+    * If no type is set, the type is automatically derived from the value. 
Currently,
+    * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+    *
+    * Examples:
+    *   - "true", "false" -> BOOLEAN
+    *   - "42", "-5" -> INT
+    *   - "2.0", "1234.222" -> DOUBLE
+    *   - VARCHAR otherwise
+    */
+  def apply() = new LiteralValue()
+}

Reply via email to