[FLINK-8863] [sql-client] Add user-defined function support in SQL Client

This closes #6090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8014dade
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8014dade
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8014dade

Branch: refs/heads/master
Commit: 8014dade4ba844786afa85fd517d6aba807dbc81
Parents: 79b38f8
Author: Xingcan Cui <xingc...@gmail.com>
Authored: Sun May 27 23:36:25 2018 +0800
Committer: Timo Walther <twal...@apache.org>
Committed: Tue Jul 10 13:55:21 2018 +0200

----------------------------------------------------------------------
 .../flink/table/client/config/Environment.java  |  20 ++
 .../client/config/UserDefinedFunction.java      |  98 +++++++
 .../client/gateway/local/ExecutionContext.java  |  44 +++
 .../gateway/local/LocalExecutorITCase.java      | 122 ++++++--
 .../gateway/utils/UserDefinedFunctions.java     | 105 +++++++
 .../src/test/resources/test-sql-client-udf.yaml |  82 ++++++
 .../flink/table/descriptors/ClassType.scala     | 123 ++++++++
 .../table/descriptors/ClassTypeValidator.scala  |  75 +++++
 .../descriptors/DescriptorProperties.scala      | 282 +++++++++++++++----
 .../table/descriptors/FunctionDescriptor.scala  |  52 ++++
 .../table/descriptors/FunctionValidator.scala   |  42 +++
 .../table/descriptors/HierarchyDescriptor.scala |  34 +++
 .../HierarchyDescriptorValidator.scala          |  35 +++
 .../flink/table/descriptors/PrimitiveType.scala |  55 ++++
 .../descriptors/PrimitiveTypeValidator.scala    | 146 ++++++++++
 .../descriptors/service/FunctionService.scala   |  87 ++++++
 .../flink/table/descriptors/ClassTypeTest.scala |  77 +++++
 .../flink/table/descriptors/FunctionTest.scala  |  65 +++++
 .../table/descriptors/PrimitiveTypeTest.scala   | 117 ++++++++
 19 files changed, 1591 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 0efb665..c1db4c1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -46,10 +46,13 @@ public class Environment {
 
        private Deployment deployment;
 
+       private Map<String, UserDefinedFunction> functions;
+
        public Environment() {
                this.tables = Collections.emptyMap();
                this.execution = new Execution();
                this.deployment = new Deployment();
+               this.functions = Collections.emptyMap();
        }
 
        public Map<String, TableDescriptor> getTables() {
@@ -76,6 +79,14 @@ public class Environment {
                });
        }
 
+       public void setFunctions(List<Map<String, Object>> functions) {
+               this.functions = new HashMap<>(functions.size());
+               functions.forEach(config -> {
+                       final UserDefinedFunction f = 
UserDefinedFunction.create(config);
+                       this.functions.put(f.name(), f);
+               });
+       }
+
        public void setExecution(Map<String, Object> config) {
                this.execution = Execution.create(config);
        }
@@ -92,6 +103,10 @@ public class Environment {
                return deployment;
        }
 
+       public Map<String, UserDefinedFunction> getFunctions() {
+               return functions;
+       }
+
        @Override
        public String toString() {
                final StringBuilder sb = new StringBuilder();
@@ -136,6 +151,11 @@ public class Environment {
                tables.putAll(env2.getTables());
                mergedEnv.tables = tables;
 
+               // merge functions
+               final Map<String, UserDefinedFunction> functions = new 
HashMap<>(env1.getFunctions());
+               mergedEnv.getFunctions().putAll(env2.getFunctions());
+               mergedEnv.functions = functions;
+
                // merge execution properties
                mergedEnv.execution = Execution.merge(env1.getExecution(), 
env2.getExecution());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
new file mode 100644
index 0000000..235f4ea
--- /dev/null
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UserDefinedFunction extends FunctionDescriptor {
+
+       private static final String FROM = "from";
+
+       private From from;
+
+       private Map<String, String> properties;
+
+       private UserDefinedFunction(String name, From from, Map<String, String> 
properties) {
+               super(name);
+               this.from = from;
+               this.properties = properties;
+       }
+
+       /**
+        * Gets where the user-defined function should be created from.
+        */
+       public From getFrom() {
+               return from;
+       }
+
+       /**
+        * Creates a UDF descriptor with the given config.
+        */
+       public static UserDefinedFunction create(Map<String, Object> config) {
+               Map<String, String> udfConfig = 
ConfigUtil.normalizeYaml(config);
+               if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) {
+                       throw new SqlClientException("The 'name' attribute of a 
function is missing.");
+               }
+
+               final String name = 
udfConfig.get(FunctionValidator.FUNCTION_NAME());
+               if (name.trim().length() <= 0) {
+                       throw new SqlClientException("Invalid function name '" 
+ name + "'.");
+               }
+
+               // the default value is "CLASS"
+               From fromValue = CLASS;
+
+               if (udfConfig.containsKey(FROM)) {
+                       final String from = udfConfig.get(FROM);
+                       try {
+                               fromValue = From.valueOf(from.toUpperCase());
+                       } catch (IllegalArgumentException ex) {
+                               throw new SqlClientException("Unknown 'from' 
value '" + from + "'.");
+                       }
+               }
+
+               switch (fromValue) {
+                       case CLASS:
+                               return new UserDefinedFunction(name, fromValue, 
udfConfig);
+                       default:
+                               throw new SqlClientException("The from 
attribute can only be \"class\" now.");
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void addProperties(DescriptorProperties properties) {
+               this.properties.forEach(properties::putString);
+       }
+
+       enum From {
+               CLASS
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index c62faa4..0cd9738 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -42,11 +42,19 @@ import org.apache.flink.table.api.BatchQueryConfig;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.TableSourceDescriptor;
+import org.apache.flink.table.descriptors.service.FunctionService;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactoryService;
 import org.apache.flink.util.FlinkException;
@@ -73,6 +81,7 @@ public class ExecutionContext<T> {
        private final List<URL> dependencies;
        private final ClassLoader classLoader;
        private final Map<String, TableSource<?>> tableSources;
+       private final Map<String, UserDefinedFunction> functions;
        private final Configuration flinkConfig;
        private final CommandLine commandLine;
        private final CustomCommandLine<T> activeCommandLine;
@@ -102,6 +111,16 @@ public class ExecutionContext<T> {
                        }
                });
 
+               // generate user-defined functions
+               functions = new HashMap<>();
+               mergedEnv.getFunctions().forEach((name, descriptor) -> {
+                       DescriptorProperties properties = new 
DescriptorProperties(true);
+                       descriptor.addProperties(properties);
+                       functions.put(
+                                       name,
+                                       
FunctionService.generateUserDefinedFunction(properties, classLoader));
+               });
+
                // convert deployment options into command line options that 
describe a cluster
                commandLine = createCommandLine(mergedEnv.getDeployment(), 
commandLineOptions);
                activeCommandLine = 
findActiveCommandLine(availableCommandLines, commandLine);
@@ -207,6 +226,31 @@ public class ExecutionContext<T> {
 
                        // register table sources
                        tableSources.forEach(tableEnv::registerTableSource);
+
+                       // register UDFs
+                       if (tableEnv instanceof StreamTableEnvironment) {
+                               StreamTableEnvironment streamTableEnvironment = 
(StreamTableEnvironment) tableEnv;
+                               functions.forEach((k, v) -> {
+                                       if (v instanceof ScalarFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
+                                       } else if (v instanceof 
AggregateFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+                                       } else if (v instanceof TableFunction) {
+                                               
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+                                       }
+                               });
+                       } else {
+                               BatchTableEnvironment batchTableEnvironment = 
(BatchTableEnvironment) tableEnv;
+                               functions.forEach((k, v) -> {
+                                       if (v instanceof ScalarFunction) {
+                                               
batchTableEnvironment.registerFunction(k, (ScalarFunction) v);
+                                       } else if (v instanceof 
AggregateFunction) {
+                                               
batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+                                       } else if (v instanceof TableFunction) {
+                                               
batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+                                       }
+                               });
+                       }
                }
 
                public QueryConfig getQueryConfig() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 9a1eb08..b32353f 100644
--- 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -66,6 +66,7 @@ import static org.junit.Assert.assertTrue;
 public class LocalExecutorITCase extends TestLogger {
 
        private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+       private static final String UDF_ENVIRONMENT_FILE = 
"test-sql-client-udf.yaml";
 
        private static final int NUM_TMS = 2;
        private static final int NUM_SLOTS_PER_TM = 2;
@@ -149,6 +150,68 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        @Test(timeout = 30_000L)
+       public void testScalarUDF() throws Exception {
+               final Executor executor =
+                               createDefaultExecutor(UDF_ENVIRONMENT_FILE, 
clusterClient);
+               final SessionContext session = new 
SessionContext("test-scalarUDF", new Environment());
+               final ResultDescriptor rd =
+                               executor.executeQuery(session, "SELECT 
scalarUDF(10)");
+               final List<String> actualResults =
+                               retrieveChangelogResult(executor, session, 
rd.getResultId());
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("(true,15)");
+               TestBaseUtils.compareResultCollections(
+                               expectedResults, actualResults, 
Comparator.naturalOrder());
+       }
+
+       @Test(timeout = 30_000L)
+       public void testAggregateUDF() throws Exception {
+               final Executor executor =
+                               createDefaultExecutor(UDF_ENVIRONMENT_FILE, 
clusterClient);
+               final SessionContext session = new 
SessionContext("test-aggregateUDF", new Environment());
+               final ResultDescriptor rd =
+                               executor.executeQuery(session, "SELECT 
aggregateUDF(cast(1 as BIGINT))");
+               final List<String> actualResults =
+                               retrieveChangelogResult(executor, session, 
rd.getResultId());
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("(true,100)");
+               TestBaseUtils.compareResultCollections(
+                               expectedResults, actualResults, 
Comparator.naturalOrder());
+       }
+
+       @Test(timeout = 30_000L)
+       public void testTableUDF() throws Exception {
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_0", url.getPath());
+               final Executor executor =
+                               createModifiedExecutor(UDF_ENVIRONMENT_FILE, 
clusterClient, replaceVars);
+
+               final SessionContext session = new 
SessionContext("test-aggregateUDF", new Environment());
+               final ResultDescriptor rd =
+                               executor.executeQuery(
+                                               session,
+                                               "SELECT w, l from TableNumber1, 
LATERAL TABLE(tableUDF(StringField1)) as T(w, l)");
+               final List<String> actualResults =
+                               retrieveChangelogResult(executor, session, 
rd.getResultId());
+               final List<String> expectedResults = new ArrayList<>();
+               expectedResults.add("(true,Hello,10)");
+               expectedResults.add("(true,World,10)");
+               expectedResults.add("(true,Hello,10)");
+               expectedResults.add("(true,World,10)");
+               expectedResults.add("(true,Hello,10)");
+               expectedResults.add("(true,World,10)");
+               expectedResults.add("(true,Hello,10)");
+               expectedResults.add("(true,World,10)");
+               expectedResults.add("(true,Hello,10)");
+               expectedResults.add("(true,World,10)");
+               expectedResults.add("(true,Hello,10)");
+               expectedResults.add("(true,World!!!!,14)");
+               TestBaseUtils.compareResultCollections(expectedResults, 
actualResults, Comparator.naturalOrder());
+       }
+
+       @Test(timeout = 30_000L)
        public void testStreamQueryExecutionChangelog() throws Exception {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
@@ -167,20 +230,8 @@ public class LocalExecutorITCase extends TestLogger {
 
                        assertFalse(desc.isMaterialized());
 
-                       final List<String> actualResults = new ArrayList<>();
-
-                       while (true) {
-                               Thread.sleep(50); // slow the processing down
-                               final TypedResult<List<Tuple2<Boolean, Row>>> 
result =
-                                       executor.retrieveResultChanges(session, 
desc.getResultId());
-                               if (result.getType() == 
TypedResult.ResultType.PAYLOAD) {
-                                       for (Tuple2<Boolean, Row> change : 
result.getPayload()) {
-                                               
actualResults.add(change.toString());
-                                       }
-                               } else if (result.getType() == 
TypedResult.ResultType.EOS) {
-                                       break;
-                               }
-                       }
+                       final List<String> actualResults =
+                                       retrieveChangelogResult(executor, 
session, desc.getResultId());
 
                        final List<String> expectedResults = new ArrayList<>();
                        expectedResults.add("(true,42,Hello World)");
@@ -266,19 +317,32 @@ public class LocalExecutorITCase extends TestLogger {
        }
 
        private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> 
clusterClient) throws Exception {
+               return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, 
clusterClient);
+       }
+
+       private <T> LocalExecutor createDefaultExecutor(
+                       String configFileName, ClusterClient<T>
+                       clusterClient) throws Exception {
                return new LocalExecutor(
-                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, 
Collections.singletonMap("$VAR_2", "batch")),
+                       EnvironmentFileUtil.parseModified(configFileName, 
Collections.singletonMap("$VAR_2", "batch")),
                        Collections.emptyList(),
                        clusterClient.getFlinkConfiguration(),
                        new DummyCustomCommandLine<T>(clusterClient));
        }
 
        private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> 
clusterClient, Map<String, String> replaceVars) throws Exception {
+               return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, 
clusterClient, replaceVars);
+       }
+
+       private <T> LocalExecutor createModifiedExecutor(
+                       String configFileName,
+                       ClusterClient<T> clusterClient,
+                       Map<String, String>     replaceVars) throws Exception {
                return new LocalExecutor(
-                       
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
-                       Collections.emptyList(),
-                       clusterClient.getFlinkConfiguration(),
-                       new DummyCustomCommandLine<T>(clusterClient));
+                               
EnvironmentFileUtil.parseModified(configFileName, replaceVars),
+                               Collections.emptyList(),
+                               clusterClient.getFlinkConfiguration(),
+                               new DummyCustomCommandLine<T>(clusterClient));
        }
 
        private List<String> retrieveTableResult(
@@ -304,4 +368,24 @@ public class LocalExecutorITCase extends TestLogger {
 
                return actualResults;
        }
+
+       private List<String> retrieveChangelogResult(
+                       Executor executor,
+                       SessionContext session,
+                       String resultID) throws InterruptedException {
+               final List<String> actualResults = new ArrayList<>();
+               while (true) {
+                       Thread.sleep(50); // slow the processing down
+                       final TypedResult<List<Tuple2<Boolean, Row>>> result =
+                                       executor.retrieveResultChanges(session, 
resultID);
+                       if (result.getType() == TypedResult.ResultType.PAYLOAD) 
{
+                               for (Tuple2<Boolean, Row> change : 
result.getPayload()) {
+                                       actualResults.add(change.toString());
+                               }
+                       } else if (result.getType() == 
TypedResult.ResultType.EOS) {
+                               break;
+                       }
+               }
+               return actualResults;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
new file mode 100644
index 0000000..b93ef66
--- /dev/null
+++ 
b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
+
+       /**
+        * The scalar function for SQL-Client test.
+        */
+       public static class ScalarUDF extends ScalarFunction {
+
+               private int offset;
+
+               public ScalarUDF(Integer offset) {
+                       this.offset = offset;
+               }
+
+               public String eval(Integer i) {
+                       return String.valueOf(i + offset);
+               }
+       }
+
+       /**
+        * The aggregate function for SQL-Client test.
+        */
+       public static class AggregateUDF extends AggregateFunction<Long, Long> {
+
+               public AggregateUDF(String name, Boolean flag, Integer value) {
+
+               }
+
+               @Override
+               public Long createAccumulator() {
+                       return 0L;
+               }
+
+               @Override
+               public Long getValue(Long accumulator) {
+                       return 100L;
+               }
+
+               public void accumulate(Long acc, Long value) {
+
+               }
+
+               @Override
+               public TypeInformation<Long> getResultType() {
+                       return BasicTypeInfo.LONG_TYPE_INFO;
+               }
+       }
+
+       /**
+        * The table function for SQL-Client test.
+        */
+       public static class TableUDF extends TableFunction<Row> {
+               private long extra = 2L;
+
+               public TableUDF(Long extra) {
+                       this.extra = extra;
+               }
+
+               public void eval(String str) {
+                       for (String s : str.split(" ")) {
+                               Row r = new Row(2);
+                               r.setField(0, s);
+                               r.setField(1, s.length() + extra);
+                               collect(r);
+                       }
+               }
+
+               @Override
+               public TypeInformation<Row> getResultType() {
+                       return Types.ROW(Types.STRING(), Types.LONG());
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
new file mode 100644
index 0000000..3e16030
--- /dev/null
+++ 
b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#==============================================================================
+# TEST ENVIRONMENT FILE
+# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase.
+#==============================================================================
+
+# this file has variables that can be filled with content by replacing $VAR_XXX
+
+tables:
+  - name: TableNumber1
+    type: source
+    schema:
+      - name: IntegerField1
+        type: INT
+      - name: StringField1
+        type: VARCHAR
+    connector:
+      type: filesystem
+      path: "$VAR_0"
+    format:
+      type: csv
+      fields:
+        - name: IntegerField1
+          type: INT
+        - name: StringField1
+          type: VARCHAR
+      line-delimiter: "\n"
+      comment-prefix: "#"
+
+functions:
+  - name: scalarUDF
+    from: class
+    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
+    constructor:
+      - 5
+
+  - name: aggregateUDF
+    from: class
+    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
+    constructor:
+      - StarryName
+      - false
+      - class: java.lang.Integer
+        constructor:
+          - class: java.lang.String
+            constructor:
+              - type: VARCHAR
+                value: 3
+
+  - name: tableUDF
+    from: class
+    class: 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
+    constructor:
+      - type: LONG
+        value: 5
+
+execution:
+  type: streaming
+  parallelism: 1
+  result-mode: changelog
+
+deployment:
+  response-timeout: 5000
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
new file mode 100644
index 0000000..78ceb3a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Descriptor for a class type.
+  *
+  * @param className the full name of the class (e.g., java.lang.Integer)
+  */
+class ClassType(var className: Option[String] = None)
+  extends HierarchyDescriptor {
+
+  // the parameter is either a primitive type or a class type
+  private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] =
+    ArrayBuffer()
+
+  /**
+    * Sets the class name for the descriptor.
+    */
+  def of(name: String): ClassType = {
+    this.className = Option(name)
+    this
+  }
+
+  /**
+    * Adds the given string formatted value as a parameter, the type of which 
will be automatically
+    * derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and 
"abc" -> String).
+    *
+    */
+  def strParam(valueStr: String): ClassType = {
+    val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr)
+    param(typeString, valueStr)
+  }
+
+  /**
+    * Adds the given string formatted value as a parameter, the type of which 
will be decided by the
+    * given type string (e.g., "DOUBLE", "VARCHAR").
+    */
+  def param(typeStr: String, valueStr: String): ClassType = {
+    param(TypeStringUtils.readTypeInfo(typeStr), valueStr)
+  }
+
+  /**
+    * Adds the give string formatted value as a parameter, the type of which 
will be defined by the
+    * given type information.
+    */
+  def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = {
+    constructor += Left(
+      new PrimitiveType[T]()
+        .of(typeInfo)
+        .value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, 
valueStr)))
+    this
+  }
+
+  /**
+    * Adds the give value as a parameter, the type of which will be 
automatically derived.
+    */
+  def param[T](value: T): ClassType = {
+    constructor += Left(
+      new PrimitiveType[T]()
+        .of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]]))
+        .value(value))
+    this
+  }
+
+  /**
+    * Adds a parameter defined by the given class type descriptor.
+    */
+  def param(field: ClassType): ClassType = {
+    constructor += Right(field)
+    this
+  }
+
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit =  {
+    addPropertiesWithPrefix("", properties)
+  }
+
+  override private[flink] def addPropertiesWithPrefix(
+      keyPrefix: String,
+      properties: DescriptorProperties): Unit = {
+    
className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}",
 _))
+    var i = 0
+    while (i < constructor.size) {
+      constructor(i) match {
+        case Left(basicType) =>
+          basicType.addPropertiesWithPrefix(
+            s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
+            properties)
+        case Right(classType) =>
+          classType.addPropertiesWithPrefix(
+            s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
+            properties)
+      }
+      i += 1
+    }
+  }
+}
+
+object ClassType {
+  def apply() = new ClassType
+  def apply(className: String) = new ClassType(Option(className))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
new file mode 100644
index 0000000..ebb48b5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+
+import scala.collection.JavaConversions._
+
+/**
+  * Validator for [[ClassType]].
+  */
+class ClassTypeValidator extends HierarchyDescriptorValidator {
+  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
+
+    properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", 
isOptional = false)
+
+    val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
+    normalizeConstructorParams(constructorPrefix, properties)
+
+    val constructorProps =
+      properties.getVariableIndexedProperties(constructorPrefix, List())
+    var i = 0
+    val primitiveTypeValidator = new PrimitiveTypeValidator
+    while (i < constructorProps.size()) {
+      if 
(constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
+        primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", 
properties)
+      } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
+        validateWithPrefix(s"$constructorPrefix.$i.", properties)
+      } else {
+        throw ValidationException("A constructor field must contain a 'class' 
or a 'type' key.")
+      }
+      i += 1
+    }
+  }
+
+  /**
+    * For each constructor parameter (e.g., constructor.0 = abc), we derive 
its type and replace it
+    * with the normalized form (e.g., constructor.0.type = VARCHAR, 
constructor.0.value = abc);
+    *
+    * @param constructorPrefix the prefix to get the constructor parameters
+    * @param properties the descriptor properties
+    */
+  def normalizeConstructorParams(
+    constructorPrefix: String,
+    properties: DescriptorProperties): Unit = {
+    val constructorValues = properties.getListProperties(constructorPrefix)
+    constructorValues.foreach(kv => {
+      properties.unsafeRemove(kv._1)
+      val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2)
+      
properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp)
+      
properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", 
kv._2)
+    })
+  }
+}
+
+object ClassTypeValidator {
+  val CLASS = "class"
+  val CONSTRUCTOR = "constructor"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 4b0b60d..8d410d0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.descriptors
 
 import java.io.Serializable
-import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, 
Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
 import java.util
 import java.util.function.{Consumer, Supplier}
 import java.util.regex.Pattern
@@ -238,10 +239,25 @@ class DescriptorProperties(normalizeKeys: Boolean = true) 
{
   }
 
   /**
+    * Returns a big decimal value under the given key if it exists.
+    */
+  def getOptionalBigDecimal(key: String): Optional[JBigDecimal] = {
+    val value = properties.get(key).map(new JBigDecimal(_))
+    toJava(value)
+  }
+
+  /**
+    * Returns a big decimal value under the given existing key.
+    */
+  def getBigDecimal(key: String): BigDecimal = {
+    getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
     * Returns a boolean value under the given key if it exists.
     */
   def getOptionalBoolean(key: String): Optional[JBoolean] = {
-    val value = 
properties.get(key).map(JBoolean.parseBoolean(_)).map(Boolean.box)
+    val value = properties.get(key).map(JBoolean.parseBoolean).map(Boolean.box)
     toJava(value)
   }
 
@@ -253,10 +269,55 @@ class DescriptorProperties(normalizeKeys: Boolean = true) 
{
   }
 
   /**
+    * Returns a byte value under the given key if it exists.
+    */
+  def getOptionalByte(key: String): Optional[JByte] = {
+    val value = properties.get(key).map(JByte.parseByte).map(Byte.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a byte value under the given existing key.
+    */
+  def getByte(key: String): Byte = {
+    getOptionalByte(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
+    * Returns a double value under the given key if it exists.
+    */
+  def getOptionalDouble(key: String): Optional[JDouble] = {
+    val value = properties.get(key).map(JDouble.parseDouble).map(Double.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a double value under the given key if it exists.
+    */
+  def getDouble(key: String): Double = {
+    getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
+    * Returns a float value under the given key if it exists.
+    */
+  def getOptionalFloat(key: String): Optional[JFloat] = {
+    val value = properties.get(key).map(JFloat.parseFloat).map(Float.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a float value under the given key if it exists.
+    */
+  def getFloat(key: String): Float = {
+    getOptionalFloat(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
     * Returns an integer value under the given key if it exists.
     */
   def getOptionalInt(key: String): Optional[JInt] = {
-    val value = properties.get(key).map(JInt.parseInt(_)).map(Int.box)
+    val value = properties.get(key).map(JInt.parseInt).map(Int.box)
     toJava(value)
   }
 
@@ -271,7 +332,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     * Returns a long value under the given key if it exists.
     */
   def getOptionalLong(key: String): Optional[JLong] = {
-    val value = properties.get(key).map(JLong.parseLong(_)).map(Long.box)
+    val value = properties.get(key).map(JLong.parseLong).map(Long.box)
     toJava(value)
   }
 
@@ -283,18 +344,18 @@ class DescriptorProperties(normalizeKeys: Boolean = true) 
{
   }
 
   /**
-    * Returns a double value under the given key if it exists.
+    * Returns a short value under the given key if it exists.
     */
-  def getOptionalDouble(key: String): Optional[JDouble] = {
-    val value = properties.get(key).map(JDouble.parseDouble(_)).map(Double.box)
+  def getOptionalShort(key: String): Optional[JShort] = {
+    val value = properties.get(key).map(JShort.parseShort).map(Short.box)
     toJava(value)
   }
 
   /**
-    * Returns a double value under the given key if it exists.
+    * Returns a short value under the given existing key.
     */
-  def getDouble(key: String): Double = {
-    getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
+  def getShort(key: String): Short = {
+    getOptionalShort(key).orElseThrow(exceptionSupplier(key))
   }
 
   /**
@@ -470,6 +531,16 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
+    * Returns all properties under a group of array formed keys.
+    *
+    * E.g. constructor -> returns all constructor.# properties.
+    */
+  def getListProperties(key: String): JMap[String, String] = {
+    val escapedKey = Pattern.quote(key)
+    properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava
+  }
+
+  /**
     * Returns all properties under a given key that contains an index in 
between.
     *
     * E.g. rowtime.0.name -> returns all rowtime.#.name properties
@@ -566,24 +637,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       min: Int, // inclusive
       max: Int) // inclusive
     : Unit = {
-
-    if (!properties.contains(key)) {
-      if (!isOptional) {
-        throw new ValidationException(s"Could not find required property 
'$key'.")
-      }
-    } else {
-      try {
-        val value = Integer.parseInt(properties(key))
-        if (value < min || value > max) {
-          throw new ValidationException(s"Property '$key' must be an integer 
value between $min " +
-            s"and $max but was: ${properties(key)}")
-        }
-      } catch {
-        case _: NumberFormatException =>
-          throw new ValidationException(
-            s"Property '$key' must be an integer value but was: 
${properties(key)}")
-      }
-    }
+    validateComparable(key, isOptional, new Integer(min), new Integer(max), 
Integer.valueOf)
   }
 
   /**
@@ -616,24 +670,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       min: Long, // inclusive
       max: Long) // inclusive
     : Unit = {
-
-    if (!properties.contains(key)) {
-      if (!isOptional) {
-        throw new ValidationException(s"Could not find required property 
'$key'.")
-      }
-    } else {
-      try {
-        val value = JLong.parseLong(properties(key))
-        if (value < min || value > max) {
-          throw new ValidationException(s"Property '$key' must be a long value 
between $min " +
-            s"and $max but was: ${properties(key)}")
-        }
-      } catch {
-        case _: NumberFormatException =>
-          throw new ValidationException(
-            s"Property '$key' must be a long value but was: 
${properties(key)}")
-      }
-    }
+    validateComparable(key, isOptional, new JLong(min), new JLong(max), 
JLong.valueOf)
   }
 
   /**
@@ -699,22 +736,165 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
       min: Double, // inclusive
       max: Double) // inclusive
     : Unit = {
+    validateComparable(key, isOptional, new JDouble(min), new JDouble(max), 
JDouble.valueOf)
+  }
 
+  /**
+    * Validates a big decimal property.
+    */
+  def validateBigDecimal(
+      key: String,
+      isOptional: Boolean): Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property 
'$key'.")
+      }
+    } else {
+      try {
+        new JBigDecimal(properties(key))
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be a big decimal value but was: 
${properties(key)}")
+      }
+    }
+  }
+
+  /**
+    * Validates a big decimal property. The boundaries are inclusive.
+    */
+  def validateBigDecimal(
+      key: String,
+      isOptional: Boolean,
+      min: BigDecimal, // inclusive
+      max: BigDecimal) // inclusive
+  : Unit = {
+    validateComparable(
+      key,
+      isOptional,
+      min.bigDecimal,
+      max.bigDecimal,
+      (value: String) => new JBigDecimal(value))
+  }
+
+  /**
+    * Validates a byte property.
+    */
+  def validateByte(
+      key: String,
+      isOptional: Boolean): Unit = validateDouble(key, isOptional, 
Byte.MinValue, Byte.MaxValue)
+
+  /**
+    * Validates a byte property. The boundaries are inclusive.
+    */
+  def validateByte(
+      key: String,
+      isOptional: Boolean,
+      min: Byte) // inclusive
+  : Unit = {
+    validateByte(key, isOptional, min, Byte.MaxValue)
+  }
+
+  /**
+    * Validates a byte property. The boundaries are inclusive.
+    */
+  def validateByte(
+      key: String,
+      isOptional: Boolean,
+      min: Byte, // inclusive
+      max: Byte) // inclusive
+  : Unit = {
+    validateComparable(key, isOptional, new JByte(min), new JByte(max), 
JByte.valueOf)
+  }
+
+  /**
+    * Validates a float property.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean): Unit = validateFloat(key, isOptional, 
Float.MinValue, Float.MaxValue)
+
+  /**
+    * Validates a float property. The boundaries are inclusive.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean,
+      min: Float) // inclusive
+  : Unit = {
+    validateFloat(key, isOptional, min, Float.MaxValue)
+  }
+
+  /**
+    * Validates a float property. The boundaries are inclusive.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean,
+      min: Float, // inclusive
+      max: Float) // inclusive
+  : Unit = {
+    validateComparable(key, isOptional, new JFloat(min), new JFloat(max), 
JFloat.valueOf)
+  }
+
+  /**
+    * Validates a short property.
+    */
+  def validateShort(
+      key: String,
+      isOptional: Boolean): Unit = validateFloat(key, isOptional, 
Short.MinValue, Short.MaxValue)
+
+  /**
+    * Validates a short property. The boundaries are inclusive.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean,
+      min: Short) // inclusive
+  : Unit = {
+    validateShort(key, isOptional, min, Short.MaxValue)
+  }
+
+  /**
+    * Validates a float property. The boundaries are inclusive.
+    */
+  def validateShort(
+      key: String,
+      isOptional: Boolean,
+      min: Short, // inclusive
+      max: Short) // inclusive
+  : Unit = {
+    validateComparable(key, isOptional, new JShort(min), new JShort(max), 
JShort.valueOf)
+  }
+
+  /**
+    * Validates a property by first parsing the string value to a comparable 
object.
+    * The boundaries are inclusive.
+    */
+  private def validateComparable[T <: Comparable[T]](
+      key: String,
+      isOptional: Boolean,
+      min: T,
+      max: T,
+      parseFunction: String => T)
+  : Unit = {
     if (!properties.contains(key)) {
       if (!isOptional) {
         throw new ValidationException(s"Could not find required property 
'$key'.")
       }
     } else {
       try {
-        val value = JDouble.parseDouble(properties(key))
-        if (value < min || value > max) {
-          throw new ValidationException(s"Property '$key' must be a double 
value between $min " +
-            s"and $max but was: ${properties(key)}")
+        val value = parseFunction(properties(key))
+
+        if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
+          throw new ValidationException(s"Property '$key' must be a 
${min.getClass.getSimpleName}" +
+            s" value between $min and $max but was: ${properties(key)}")
         }
       } catch {
         case _: NumberFormatException =>
           throw new ValidationException(
-            s"Property '$key' must be an double value but was: 
${properties(key)}")
+            s"Property '$key' must be a byte value but was: 
${properties(key)}")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
new file mode 100644
index 0000000..f4c8363
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Descriptor for describing a function that can be instantiated from 
somewhere (e.g., a class).
+  *
+  * @param name name of the function
+  */
+class FunctionDescriptor(var name: String) extends Descriptor {
+
+  var classDescriptor: Option[ClassType] = None
+
+  /**
+    * Uses the class provided by the descriptor to instantiate the function.
+    */
+  def using(classDescriptor: ClassType): FunctionDescriptor = {
+    this.classDescriptor = Option(classDescriptor)
+    this
+  }
+
+  def getDescriptorProperties: DescriptorProperties = {
+    val descriptorProperties = new DescriptorProperties()
+    addProperties(descriptorProperties)
+    descriptorProperties
+  }
+
+  override def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putString(FunctionValidator.FUNCTION_NAME, name)
+    classDescriptor.foreach(_.addProperties(properties))
+  }
+}
+
+object FunctionDescriptor {
+  def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
new file mode 100644
index 0000000..f36b9c2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.functions.UserDefinedFunction
+
+import scala.collection.JavaConverters._
+
+/**
+  * Validator for [[FunctionDescriptor]].
+  */
+class FunctionValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = 
false, 1)
+    new ClassTypeValidator().validate(properties)
+  }
+}
+
+object FunctionValidator {
+
+  val FUNCTION_NAME = "name"
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
new file mode 100644
index 0000000..f97c807
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A descriptor that may exist in an arbitrary level (be recursively included 
by other
+  * descriptors).
+  */
+abstract class HierarchyDescriptor extends Descriptor {
+
+  /**
+    * Internal method for properties conversion. All the property keys will be 
prefixed according
+    * to the level.
+    */
+  private[flink] def addPropertiesWithPrefix(
+    keyPrefix: String, properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
new file mode 100644
index 0000000..73dd1f9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Validator for a [[HierarchyDescriptor]].
+  */
+trait HierarchyDescriptorValidator extends DescriptorValidator{
+
+  def validate(properties: DescriptorProperties): Unit = {
+    validateWithPrefix("", properties)
+  }
+
+  /**
+    * Performs validation with a prefix for the keys.
+    */
+  def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): 
Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
new file mode 100644
index 0000000..9e5fcab
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Descriptor for a primitive type. Use internally only.
+  */
+class PrimitiveType[T] extends HierarchyDescriptor {
+
+  var typeInformation: TypeInformation[T] = _
+  var value: T = _
+
+  def of(basicType: TypeInformation[T]): PrimitiveType[T] = {
+    typeInformation = basicType
+    this
+  }
+
+  def value(value: T): PrimitiveType[T] = {
+    this.value = value
+    this
+  }
+
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    addPropertiesWithPrefix("", properties)
+  }
+
+  override private[flink] def addPropertiesWithPrefix(
+      keyPrefix: String, properties: DescriptorProperties): Unit = {
+    properties.putString(keyPrefix + "type", 
TypeStringUtils.writeTypeInfo(typeInformation))
+    properties.putString(keyPrefix + "value", value.toString)
+  }
+}
+
+object PrimitiveType {
+  def apply[T]() = new PrimitiveType[T]()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
new file mode 100644
index 0000000..9b2f776
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Validator for [[PrimitiveType]].
+  */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+
+  /*
+   * TODO The following types need to be supported next.
+   * Types.SQL_DATE
+   * Types.SQL_TIME
+   * Types.SQL_TIMESTAMP
+   * Types.INTERVAL_MONTHS
+   * Types.INTERVAL_MILLIS
+   * Types.PRIMITIVE_ARRAY
+   * Types.OBJECT_ARRAY
+   * Types.MAP
+   * Types.MULTISET
+   */
+
+  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
+    val typeKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}"
+    val valueKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}"
+
+    properties.validateType(typeKey, isOptional = false)
+
+    val typeInfo: TypeInformation[_] =
+      properties.getType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}")
+    typeInfo match {
+      case Types.DECIMAL => properties.validateBigDecimal(valueKey, isOptional 
= false)
+      case Types.BOOLEAN => properties.validateBoolean(valueKey, isOptional = 
false)
+      case Types.BYTE => properties.validateByte(valueKey, isOptional = false)
+      case Types.DOUBLE => properties.validateDouble(valueKey, isOptional = 
false)
+      case Types.FLOAT => properties.validateFloat(valueKey, isOptional = 
false)
+      case Types.INT => properties.validateInt(valueKey, isOptional = false)
+      case Types.LONG => properties.validateLong(valueKey, isOptional = false)
+      case Types.SHORT => properties.validateShort(valueKey, isOptional = 
false)
+      case Types.STRING => properties.validateString(valueKey, isOptional = 
false)
+      case _ => throw TableException(s"Unsupported type 
${typeInfo.getTypeClass}.")
+    }
+  }
+}
+
+object PrimitiveTypeValidator {
+  val PRIMITIVE_TYPE = "type"
+  val PRIMITIVE_VALUE = "value"
+
+  private val LITERAL_FALSE = "false"
+  private val LITERAL_TRUE = "true"
+
+  /**
+    * Derives the value according to the type and value strings.
+    *
+    * @param keyPrefix the prefix of the primitive type key
+    * @param properties the descriptor properties
+    * @return the derived value
+    */
+  def derivePrimitiveValue(keyPrefix: String, properties: 
DescriptorProperties): Any = {
+    val typeInfo =
+      properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+    val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+    val value = typeInfo match {
+      case Types.DECIMAL => properties.getBigDecimal(valueKey)
+      case Types.BOOLEAN => properties.getBoolean(valueKey)
+      case Types.BYTE => properties.getByte(valueKey)
+      case Types.DOUBLE => properties.getDouble(valueKey)
+      case Types.FLOAT => properties.getFloat(valueKey)
+      case Types.INT => properties.getInt(valueKey)
+      case Types.LONG => properties.getLong(valueKey)
+      case Types.SHORT => properties.getShort(valueKey)
+      case Types.STRING => properties.getString(valueKey)
+      case _ => throw TableException(s"Unsupported type 
${typeInfo.getTypeClass}.")
+    }
+    value
+  }
+
+  /**
+    * Derives the actually value with the type information and string 
formatted value.
+    */
+  def deriveTypeAndValueStr[T](typeInfo: TypeInformation[T], valueStr: 
String): T = {
+    typeInfo match {
+      case Types.DECIMAL => new JBigDecimal(valueStr).asInstanceOf[T]
+      case Types.BOOLEAN => JBoolean.parseBoolean(valueStr).asInstanceOf[T]
+      case Types.BYTE => JByte.parseByte(valueStr).asInstanceOf[T]
+      case Types.DOUBLE => JDouble.parseDouble(valueStr).asInstanceOf[T]
+      case Types.FLOAT => JFloat.parseFloat(valueStr).asInstanceOf[T]
+      case Types.INT => JInt.parseInt(valueStr).asInstanceOf[T]
+      case Types.LONG => JLong.parseLong(valueStr).asInstanceOf[T]
+      case Types.SHORT => JShort.parseShort(valueStr).asInstanceOf[T]
+      case Types.STRING => valueStr.asInstanceOf[T]
+      case _ => throw TableException(s"Unsupported type 
${typeInfo.getTypeClass}.")
+    }
+  }
+
+/**
+  * Tries to derive the type string from the given string value.
+  * The derive priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
+    *
+    * @param valueStr the string formatted value
+    * @return the type string of the given value
+    */
+  def deriveTypeStrFromValueStr(valueStr: String): String = {
+    if (valueStr.equals(LITERAL_TRUE) || valueStr.equals(LITERAL_FALSE)) {
+      TypeStringUtils.BOOLEAN.key
+    } else {
+      try {
+        valueStr.toInt
+        TypeStringUtils.INT.key
+      } catch {
+        case _: NumberFormatException =>
+          try {
+            valueStr.toDouble
+            TypeStringUtils.DOUBLE.key
+          } catch {
+            case _: NumberFormatException =>
+              TypeStringUtils.STRING.key
+          }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
new file mode 100644
index 0000000..9e97817
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.service
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.{ClassTypeValidator, 
DescriptorProperties, FunctionDescriptor, FunctionValidator, 
PrimitiveTypeValidator}
+import org.apache.flink.table.functions.UserDefinedFunction
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+/**
+  * Utils that serve [[FunctionDescriptor]].
+  */
+object FunctionService {
+   /**
+     * Generates a user-defined function with the given properties.
+     *
+     * @param properties the descriptor properties that belongs to a 
[[FunctionDescriptor]]
+     * @param classLoader the class loader to load the function and its 
parameter's classes
+     * @return the generated user-defined function
+     */
+   def generateUserDefinedFunction(
+      properties: DescriptorProperties,
+      classLoader: ClassLoader): UserDefinedFunction = {
+      new FunctionValidator().validate(properties)
+      generateInstance[UserDefinedFunction]("", properties, classLoader)
+   }
+
+   /**
+     * Recursively generate an instance of a class according the given 
properties.
+     *
+     * @param keyPrefix the prefix to fetch properties
+     * @param descriptorProps the descriptor properties that contains the 
class type information
+     * @param classLoader the class loader to load the class
+     * @tparam T type fo the generated instance
+     * @return an instance of the class
+     */
+   def generateInstance[T](
+      keyPrefix: String,
+      descriptorProps: DescriptorProperties,
+      classLoader: ClassLoader): T = {
+      val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
+      val constructorProps =
+         descriptorProps.getVariableIndexedProperties(constructorPrefix, 
List())
+      var i = 0
+      val typeValueList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer
+      while (i < constructorProps.size()) {
+         if 
(constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
+            val primitiveVal = PrimitiveTypeValidator
+              .derivePrimitiveValue(s"$constructorPrefix.$i.", descriptorProps)
+            typeValueList += ((primitiveVal.getClass, primitiveVal))
+         } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) 
{
+            val typeValuePair = (
+              Class.forName(
+                 
descriptorProps.getString(constructorProps(i).get(ClassTypeValidator.CLASS))),
+              generateInstance(s"$constructorPrefix.$i.", descriptorProps, 
classLoader))
+            typeValueList += typeValuePair
+         }
+         i += 1
+      }
+      val clazz = classLoader
+        
.loadClass(descriptorProps.getString(s"$keyPrefix${ClassTypeValidator.CLASS}"))
+      val constructor = clazz.getConstructor(typeValueList.map(_._1): _*)
+      if (null == constructor) {
+         throw TableException(s"Cannot find a constructor with parameter types 
" +
+           s"${typeValueList.map(_._1)} for ${clazz.getName}")
+      }
+      constructor.newInstance(typeValueList.map(_._2.asInstanceOf[AnyRef]): 
_*).asInstanceOf[T]
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
new file mode 100644
index 0000000..43710f6
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{List => JList, Map => JMap, Arrays => JArrays}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class ClassTypeTest extends DescriptorTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingClass(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), ClassTypeValidator.CLASS)
+  }
+
+  override def descriptors(): JList[Descriptor] = {
+    val desc1 = ClassType("class1")
+      .param(BasicTypeInfo.LONG_TYPE_INFO, "1")
+      .param(
+        ClassType("class2")
+          .param(
+            ClassType("class3")
+              .param("StarryNight")
+              .param(
+                ClassType("class4"))))
+      .param(2L)
+
+    val desc2 = ClassType().of("class2")
+
+    JArrays.asList(desc1, desc2)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new ClassTypeValidator()
+  }
+
+  override def properties(): JList[JMap[String, String]] = {
+    val props1 = Map(
+      "class" -> "class1",
+      "constructor.0.type" -> "BIGINT",
+      "constructor.0.value" -> "1",
+      "constructor.1.class" -> "class2",
+      "constructor.1.constructor.0.class" -> "class3",
+      "constructor.1.constructor.0.constructor.0.type" -> "VARCHAR",
+      "constructor.1.constructor.0.constructor.0.value" -> "StarryNight",
+      "constructor.1.constructor.0.constructor.1.class" -> "class4",
+      "constructor.2.type" -> "BIGINT",
+      "constructor.2.value" -> "2"
+    )
+
+    val props2 = Map(
+      "class" -> "class2"
+    )
+
+    JArrays.asList(props1.asJava, props2.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
new file mode 100644
index 0000000..3b93ca25
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{List => JList, Map => JMap, Arrays => JArrays}
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class FunctionTest extends DescriptorTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingName(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), "name")
+  }
+
+  override def descriptors(): JList[Descriptor] = {
+    val desc1 = FunctionDescriptor("func1")
+      .using(
+        ClassType("another.class")
+          .of("my.class")
+          .param("INT", "1")
+          .param(
+            ClassType()
+              .of("my.class2")
+              .strParam("true")))
+
+    JArrays.asList(desc1)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new FunctionValidator()
+  }
+
+  override def properties(): JList[JMap[String, String]] = {
+    val props1 = Map(
+      "name" -> "func1",
+      "class" -> "my.class",
+      "constructor.0.type" -> "INT",
+      "constructor.0.value" -> "1",
+      "constructor.1.class" -> "my.class2",
+      "constructor.1.constructor.0.type" -> "BOOLEAN",
+      "constructor.1.constructor.0.value" -> "true"
+    )
+    JArrays.asList(props1.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
new file mode 100644
index 0000000..f684e2a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{Arrays => JArrays, List => JList, Map => JMap}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class PrimitiveTypeTest extends DescriptorTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingType(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), 
PrimitiveTypeValidator.PRIMITIVE_TYPE)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingValue(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), 
PrimitiveTypeValidator.PRIMITIVE_VALUE)
+  }
+
+  override def descriptors(): JList[Descriptor] = {
+    val bigDecimalDesc = PrimitiveType().of(Types.DECIMAL).value(new 
JBigDecimal(1))
+    val booleanDesc = PrimitiveType().of(Types.BOOLEAN).value(false)
+    val byteDesc = PrimitiveType().of(Types.BYTE).value(4.asInstanceOf[Byte])
+    val doubleDesc = PrimitiveType().of(Types.DOUBLE).value(7.0)
+    val floatDesc = PrimitiveType().of(Types.FLOAT).value(8f)
+    val intDesc = PrimitiveType().of(Types.INT).value(9)
+    val longDesc = PrimitiveType().of(Types.LONG).value(10L)
+    val shortDesc = 
PrimitiveType().of(Types.SHORT).value(11.asInstanceOf[Short])
+    val stringDesc = PrimitiveType().of(Types.STRING).value("12")
+
+    JArrays.asList(
+      bigDecimalDesc,
+      booleanDesc,
+      byteDesc,
+      doubleDesc,
+      floatDesc,
+      intDesc,
+      longDesc,
+      shortDesc,
+      stringDesc)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new PrimitiveTypeValidator()
+  }
+
+  override def properties(): JList[JMap[String, String]] = {
+    val bigDecimalProps = Map(
+      "type" -> "DECIMAL",
+      "value" -> "1"
+    )
+    val booleanDesc = Map(
+      "type" -> "BOOLEAN",
+      "value" -> "false"
+    )
+    val byteDesc = Map(
+      "type" -> "TINYINT",
+      "value" -> "4"
+    )
+
+    val doubleDesc = Map(
+      "type" -> "DOUBLE",
+      "value" -> "7.0"
+    )
+    val floatDesc = Map(
+      "type" -> "FLOAT",
+      "value" -> "8.0"
+    )
+    val intProps = Map(
+      "type" -> "INT",
+      "value" -> "9"
+    )
+    val longDesc = Map(
+      "type" -> "BIGINT",
+      "value" -> "10"
+    )
+    val shortDesc = Map(
+      "type" -> "SMALLINT",
+      "value" -> "11"
+    )
+    val stringDesc = Map(
+      "type" -> "VARCHAR",
+      "value" -> "12"
+    )
+    JArrays.asList(
+      bigDecimalProps.asJava,
+      booleanDesc.asJava,
+      byteDesc.asJava,
+      doubleDesc.asJava,
+      floatDesc.asJava,
+      intProps.asJava,
+      longDesc.asJava,
+      shortDesc.asJava,
+      stringDesc.asJava)
+  }
+}

Reply via email to