[FLINK-8863] [sql-client] Add user-defined function support in SQL Client This closes #6090.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8014dade Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8014dade Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8014dade Branch: refs/heads/master Commit: 8014dade4ba844786afa85fd517d6aba807dbc81 Parents: 79b38f8 Author: Xingcan Cui <xingc...@gmail.com> Authored: Sun May 27 23:36:25 2018 +0800 Committer: Timo Walther <twal...@apache.org> Committed: Tue Jul 10 13:55:21 2018 +0200 ---------------------------------------------------------------------- .../flink/table/client/config/Environment.java | 20 ++ .../client/config/UserDefinedFunction.java | 98 +++++++ .../client/gateway/local/ExecutionContext.java | 44 +++ .../gateway/local/LocalExecutorITCase.java | 122 ++++++-- .../gateway/utils/UserDefinedFunctions.java | 105 +++++++ .../src/test/resources/test-sql-client-udf.yaml | 82 ++++++ .../flink/table/descriptors/ClassType.scala | 123 ++++++++ .../table/descriptors/ClassTypeValidator.scala | 75 +++++ .../descriptors/DescriptorProperties.scala | 282 +++++++++++++++---- .../table/descriptors/FunctionDescriptor.scala | 52 ++++ .../table/descriptors/FunctionValidator.scala | 42 +++ .../table/descriptors/HierarchyDescriptor.scala | 34 +++ .../HierarchyDescriptorValidator.scala | 35 +++ .../flink/table/descriptors/PrimitiveType.scala | 55 ++++ .../descriptors/PrimitiveTypeValidator.scala | 146 ++++++++++ .../descriptors/service/FunctionService.scala | 87 ++++++ .../flink/table/descriptors/ClassTypeTest.scala | 77 +++++ .../flink/table/descriptors/FunctionTest.scala | 65 +++++ .../table/descriptors/PrimitiveTypeTest.scala | 117 ++++++++ 19 files changed, 1591 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java index 0efb665..c1db4c1 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java @@ -46,10 +46,13 @@ public class Environment { private Deployment deployment; + private Map<String, UserDefinedFunction> functions; + public Environment() { this.tables = Collections.emptyMap(); this.execution = new Execution(); this.deployment = new Deployment(); + this.functions = Collections.emptyMap(); } public Map<String, TableDescriptor> getTables() { @@ -76,6 +79,14 @@ public class Environment { }); } + public void setFunctions(List<Map<String, Object>> functions) { + this.functions = new HashMap<>(functions.size()); + functions.forEach(config -> { + final UserDefinedFunction f = UserDefinedFunction.create(config); + this.functions.put(f.name(), f); + }); + } + public void setExecution(Map<String, Object> config) { this.execution = Execution.create(config); } @@ -92,6 +103,10 @@ public class Environment { return deployment; } + public Map<String, UserDefinedFunction> getFunctions() { + return functions; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -136,6 +151,11 @@ public class Environment { tables.putAll(env2.getTables()); mergedEnv.tables = tables; + // merge functions + final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions()); + mergedEnv.getFunctions().putAll(env2.getFunctions()); + mergedEnv.functions = functions; + // merge execution properties mergedEnv.execution = Execution.merge(env1.getExecution(), env2.getExecution()); http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java new file mode 100644 index 0000000..235f4ea --- /dev/null +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FunctionDescriptor; +import org.apache.flink.table.descriptors.FunctionValidator; + +import java.util.Map; + +import static org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS; + +/** + * Descriptor for user-defined functions. + */ +public class UserDefinedFunction extends FunctionDescriptor { + + private static final String FROM = "from"; + + private From from; + + private Map<String, String> properties; + + private UserDefinedFunction(String name, From from, Map<String, String> properties) { + super(name); + this.from = from; + this.properties = properties; + } + + /** + * Gets where the user-defined function should be created from. + */ + public From getFrom() { + return from; + } + + /** + * Creates a UDF descriptor with the given config. + */ + public static UserDefinedFunction create(Map<String, Object> config) { + Map<String, String> udfConfig = ConfigUtil.normalizeYaml(config); + if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) { + throw new SqlClientException("The 'name' attribute of a function is missing."); + } + + final String name = udfConfig.get(FunctionValidator.FUNCTION_NAME()); + if (name.trim().length() <= 0) { + throw new SqlClientException("Invalid function name '" + name + "'."); + } + + // the default value is "CLASS" + From fromValue = CLASS; + + if (udfConfig.containsKey(FROM)) { + final String from = udfConfig.get(FROM); + try { + fromValue = From.valueOf(from.toUpperCase()); + } catch (IllegalArgumentException ex) { + throw new SqlClientException("Unknown 'from' value '" + from + "'."); + } + } + + switch (fromValue) { + case CLASS: + return new UserDefinedFunction(name, fromValue, udfConfig); + default: + throw new SqlClientException("The from attribute can only be \"class\" now."); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public void addProperties(DescriptorProperties properties) { + this.properties.forEach(properties::putString); + } + + enum From { + CLASS + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index c62faa4..0cd9738 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -42,11 +42,19 @@ import org.apache.flink.table.api.BatchQueryConfig; import org.apache.flink.table.api.QueryConfig; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.client.config.Deployment; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.TableSourceDescriptor; +import org.apache.flink.table.descriptors.service.FunctionService; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceFactoryService; import org.apache.flink.util.FlinkException; @@ -73,6 +81,7 @@ public class ExecutionContext<T> { private final List<URL> dependencies; private final ClassLoader classLoader; private final Map<String, TableSource<?>> tableSources; + private final Map<String, UserDefinedFunction> functions; private final Configuration flinkConfig; private final CommandLine commandLine; private final CustomCommandLine<T> activeCommandLine; @@ -102,6 +111,16 @@ public class ExecutionContext<T> { } }); + // generate user-defined functions + functions = new HashMap<>(); + mergedEnv.getFunctions().forEach((name, descriptor) -> { + DescriptorProperties properties = new DescriptorProperties(true); + descriptor.addProperties(properties); + functions.put( + name, + FunctionService.generateUserDefinedFunction(properties, classLoader)); + }); + // convert deployment options into command line options that describe a cluster commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions); activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine); @@ -207,6 +226,31 @@ public class ExecutionContext<T> { // register table sources tableSources.forEach(tableEnv::registerTableSource); + + // register UDFs + if (tableEnv instanceof StreamTableEnvironment) { + StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv; + functions.forEach((k, v) -> { + if (v instanceof ScalarFunction) { + streamTableEnvironment.registerFunction(k, (ScalarFunction) v); + } else if (v instanceof AggregateFunction) { + streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v); + } else if (v instanceof TableFunction) { + streamTableEnvironment.registerFunction(k, (TableFunction<?>) v); + } + }); + } else { + BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment) tableEnv; + functions.forEach((k, v) -> { + if (v instanceof ScalarFunction) { + batchTableEnvironment.registerFunction(k, (ScalarFunction) v); + } else if (v instanceof AggregateFunction) { + batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v); + } else if (v instanceof TableFunction) { + batchTableEnvironment.registerFunction(k, (TableFunction<?>) v); + } + }); + } } public QueryConfig getQueryConfig() { http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 9a1eb08..b32353f 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -66,6 +66,7 @@ import static org.junit.Assert.assertTrue; public class LocalExecutorITCase extends TestLogger { private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; + private static final String UDF_ENVIRONMENT_FILE = "test-sql-client-udf.yaml"; private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; @@ -149,6 +150,68 @@ public class LocalExecutorITCase extends TestLogger { } @Test(timeout = 30_000L) + public void testScalarUDF() throws Exception { + final Executor executor = + createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient); + final SessionContext session = new SessionContext("test-scalarUDF", new Environment()); + final ResultDescriptor rd = + executor.executeQuery(session, "SELECT scalarUDF(10)"); + final List<String> actualResults = + retrieveChangelogResult(executor, session, rd.getResultId()); + final List<String> expectedResults = new ArrayList<>(); + expectedResults.add("(true,15)"); + TestBaseUtils.compareResultCollections( + expectedResults, actualResults, Comparator.naturalOrder()); + } + + @Test(timeout = 30_000L) + public void testAggregateUDF() throws Exception { + final Executor executor = + createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient); + final SessionContext session = new SessionContext("test-aggregateUDF", new Environment()); + final ResultDescriptor rd = + executor.executeQuery(session, "SELECT aggregateUDF(cast(1 as BIGINT))"); + final List<String> actualResults = + retrieveChangelogResult(executor, session, rd.getResultId()); + final List<String> expectedResults = new ArrayList<>(); + expectedResults.add("(true,100)"); + TestBaseUtils.compareResultCollections( + expectedResults, actualResults, Comparator.naturalOrder()); + } + + @Test(timeout = 30_000L) + public void testTableUDF() throws Exception { + final URL url = getClass().getClassLoader().getResource("test-data.csv"); + Objects.requireNonNull(url); + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_0", url.getPath()); + final Executor executor = + createModifiedExecutor(UDF_ENVIRONMENT_FILE, clusterClient, replaceVars); + + final SessionContext session = new SessionContext("test-aggregateUDF", new Environment()); + final ResultDescriptor rd = + executor.executeQuery( + session, + "SELECT w, l from TableNumber1, LATERAL TABLE(tableUDF(StringField1)) as T(w, l)"); + final List<String> actualResults = + retrieveChangelogResult(executor, session, rd.getResultId()); + final List<String> expectedResults = new ArrayList<>(); + expectedResults.add("(true,Hello,10)"); + expectedResults.add("(true,World,10)"); + expectedResults.add("(true,Hello,10)"); + expectedResults.add("(true,World,10)"); + expectedResults.add("(true,Hello,10)"); + expectedResults.add("(true,World,10)"); + expectedResults.add("(true,Hello,10)"); + expectedResults.add("(true,World,10)"); + expectedResults.add("(true,Hello,10)"); + expectedResults.add("(true,World,10)"); + expectedResults.add("(true,Hello,10)"); + expectedResults.add("(true,World!!!!,14)"); + TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder()); + } + + @Test(timeout = 30_000L) public void testStreamQueryExecutionChangelog() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -167,20 +230,8 @@ public class LocalExecutorITCase extends TestLogger { assertFalse(desc.isMaterialized()); - final List<String> actualResults = new ArrayList<>(); - - while (true) { - Thread.sleep(50); // slow the processing down - final TypedResult<List<Tuple2<Boolean, Row>>> result = - executor.retrieveResultChanges(session, desc.getResultId()); - if (result.getType() == TypedResult.ResultType.PAYLOAD) { - for (Tuple2<Boolean, Row> change : result.getPayload()) { - actualResults.add(change.toString()); - } - } else if (result.getType() == TypedResult.ResultType.EOS) { - break; - } - } + final List<String> actualResults = + retrieveChangelogResult(executor, session, desc.getResultId()); final List<String> expectedResults = new ArrayList<>(); expectedResults.add("(true,42,Hello World)"); @@ -266,19 +317,32 @@ public class LocalExecutorITCase extends TestLogger { } private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception { + return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient); + } + + private <T> LocalExecutor createDefaultExecutor( + String configFileName, ClusterClient<T> + clusterClient) throws Exception { return new LocalExecutor( - EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")), + EnvironmentFileUtil.parseModified(configFileName, Collections.singletonMap("$VAR_2", "batch")), Collections.emptyList(), clusterClient.getFlinkConfiguration(), new DummyCustomCommandLine<T>(clusterClient)); } private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception { + return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient, replaceVars); + } + + private <T> LocalExecutor createModifiedExecutor( + String configFileName, + ClusterClient<T> clusterClient, + Map<String, String> replaceVars) throws Exception { return new LocalExecutor( - EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars), - Collections.emptyList(), - clusterClient.getFlinkConfiguration(), - new DummyCustomCommandLine<T>(clusterClient)); + EnvironmentFileUtil.parseModified(configFileName, replaceVars), + Collections.emptyList(), + clusterClient.getFlinkConfiguration(), + new DummyCustomCommandLine<T>(clusterClient)); } private List<String> retrieveTableResult( @@ -304,4 +368,24 @@ public class LocalExecutorITCase extends TestLogger { return actualResults; } + + private List<String> retrieveChangelogResult( + Executor executor, + SessionContext session, + String resultID) throws InterruptedException { + final List<String> actualResults = new ArrayList<>(); + while (true) { + Thread.sleep(50); // slow the processing down + final TypedResult<List<Tuple2<Boolean, Row>>> result = + executor.retrieveResultChanges(session, resultID); + if (result.getType() == TypedResult.ResultType.PAYLOAD) { + for (Tuple2<Boolean, Row> change : result.getPayload()) { + actualResults.add(change.toString()); + } + } else if (result.getType() == TypedResult.ResultType.EOS) { + break; + } + } + return actualResults; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java new file mode 100644 index 0000000..b93ef66 --- /dev/null +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.utils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +/** + * A bunch of UDFs for SQL-Client test. + */ +public class UserDefinedFunctions { + + /** + * The scalar function for SQL-Client test. + */ + public static class ScalarUDF extends ScalarFunction { + + private int offset; + + public ScalarUDF(Integer offset) { + this.offset = offset; + } + + public String eval(Integer i) { + return String.valueOf(i + offset); + } + } + + /** + * The aggregate function for SQL-Client test. + */ + public static class AggregateUDF extends AggregateFunction<Long, Long> { + + public AggregateUDF(String name, Boolean flag, Integer value) { + + } + + @Override + public Long createAccumulator() { + return 0L; + } + + @Override + public Long getValue(Long accumulator) { + return 100L; + } + + public void accumulate(Long acc, Long value) { + + } + + @Override + public TypeInformation<Long> getResultType() { + return BasicTypeInfo.LONG_TYPE_INFO; + } + } + + /** + * The table function for SQL-Client test. + */ + public static class TableUDF extends TableFunction<Row> { + private long extra = 2L; + + public TableUDF(Long extra) { + this.extra = extra; + } + + public void eval(String str) { + for (String s : str.split(" ")) { + Row r = new Row(2); + r.setField(0, s); + r.setField(1, s.length() + extra); + collect(r); + } + } + + @Override + public TypeInformation<Row> getResultType() { + return Types.ROW(Types.STRING(), Types.LONG()); + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml new file mode 100644 index 0000000..3e16030 --- /dev/null +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml @@ -0,0 +1,82 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +#============================================================================== +# TEST ENVIRONMENT FILE +# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase. +#============================================================================== + +# this file has variables that can be filled with content by replacing $VAR_XXX + +tables: + - name: TableNumber1 + type: source + schema: + - name: IntegerField1 + type: INT + - name: StringField1 + type: VARCHAR + connector: + type: filesystem + path: "$VAR_0" + format: + type: csv + fields: + - name: IntegerField1 + type: INT + - name: StringField1 + type: VARCHAR + line-delimiter: "\n" + comment-prefix: "#" + +functions: + - name: scalarUDF + from: class + class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF + constructor: + - 5 + + - name: aggregateUDF + from: class + class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF + constructor: + - StarryName + - false + - class: java.lang.Integer + constructor: + - class: java.lang.String + constructor: + - type: VARCHAR + value: 3 + + - name: tableUDF + from: class + class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF + constructor: + - type: LONG + value: 5 + +execution: + type: streaming + parallelism: 1 + result-mode: changelog + +deployment: + response-timeout: 5000 + + http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala new file mode 100644 index 0000000..78ceb3a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.typeutils.TypeStringUtils + +import scala.collection.mutable.ArrayBuffer + +/** + * Descriptor for a class type. + * + * @param className the full name of the class (e.g., java.lang.Integer) + */ +class ClassType(var className: Option[String] = None) + extends HierarchyDescriptor { + + // the parameter is either a primitive type or a class type + private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] = + ArrayBuffer() + + /** + * Sets the class name for the descriptor. + */ + def of(name: String): ClassType = { + this.className = Option(name) + this + } + + /** + * Adds the given string formatted value as a parameter, the type of which will be automatically + * derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and "abc" -> String). + * + */ + def strParam(valueStr: String): ClassType = { + val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr) + param(typeString, valueStr) + } + + /** + * Adds the given string formatted value as a parameter, the type of which will be decided by the + * given type string (e.g., "DOUBLE", "VARCHAR"). + */ + def param(typeStr: String, valueStr: String): ClassType = { + param(TypeStringUtils.readTypeInfo(typeStr), valueStr) + } + + /** + * Adds the give string formatted value as a parameter, the type of which will be defined by the + * given type information. + */ + def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = { + constructor += Left( + new PrimitiveType[T]() + .of(typeInfo) + .value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, valueStr))) + this + } + + /** + * Adds the give value as a parameter, the type of which will be automatically derived. + */ + def param[T](value: T): ClassType = { + constructor += Left( + new PrimitiveType[T]() + .of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]])) + .value(value)) + this + } + + /** + * Adds a parameter defined by the given class type descriptor. + */ + def param(field: ClassType): ClassType = { + constructor += Right(field) + this + } + + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + addPropertiesWithPrefix("", properties) + } + + override private[flink] def addPropertiesWithPrefix( + keyPrefix: String, + properties: DescriptorProperties): Unit = { + className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}", _)) + var i = 0 + while (i < constructor.size) { + constructor(i) match { + case Left(basicType) => + basicType.addPropertiesWithPrefix( + s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.", + properties) + case Right(classType) => + classType.addPropertiesWithPrefix( + s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.", + properties) + } + i += 1 + } + } +} + +object ClassType { + def apply() = new ClassType + def apply(className: String) = new ClassType(Option(className)) +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala new file mode 100644 index 0000000..ebb48b5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException + +import scala.collection.JavaConversions._ + +/** + * Validator for [[ClassType]]. + */ +class ClassTypeValidator extends HierarchyDescriptorValidator { + override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = { + + properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", isOptional = false) + + val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}" + normalizeConstructorParams(constructorPrefix, properties) + + val constructorProps = + properties.getVariableIndexedProperties(constructorPrefix, List()) + var i = 0 + val primitiveTypeValidator = new PrimitiveTypeValidator + while (i < constructorProps.size()) { + if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) { + primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", properties) + } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) { + validateWithPrefix(s"$constructorPrefix.$i.", properties) + } else { + throw ValidationException("A constructor field must contain a 'class' or a 'type' key.") + } + i += 1 + } + } + + /** + * For each constructor parameter (e.g., constructor.0 = abc), we derive its type and replace it + * with the normalized form (e.g., constructor.0.type = VARCHAR, constructor.0.value = abc); + * + * @param constructorPrefix the prefix to get the constructor parameters + * @param properties the descriptor properties + */ + def normalizeConstructorParams( + constructorPrefix: String, + properties: DescriptorProperties): Unit = { + val constructorValues = properties.getListProperties(constructorPrefix) + constructorValues.foreach(kv => { + properties.unsafeRemove(kv._1) + val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2) + properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp) + properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", kv._2) + }) + } +} + +object ClassTypeValidator { + val CLASS = "class" + val CONSTRUCTOR = "constructor" +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala index 4b0b60d..8d410d0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala @@ -19,7 +19,8 @@ package org.apache.flink.table.descriptors import java.io.Serializable -import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong} +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort} +import java.math.{BigDecimal => JBigDecimal} import java.util import java.util.function.{Consumer, Supplier} import java.util.regex.Pattern @@ -238,10 +239,25 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** + * Returns a big decimal value under the given key if it exists. + */ + def getOptionalBigDecimal(key: String): Optional[JBigDecimal] = { + val value = properties.get(key).map(new JBigDecimal(_)) + toJava(value) + } + + /** + * Returns a big decimal value under the given existing key. + */ + def getBigDecimal(key: String): BigDecimal = { + getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key)) + } + + /** * Returns a boolean value under the given key if it exists. */ def getOptionalBoolean(key: String): Optional[JBoolean] = { - val value = properties.get(key).map(JBoolean.parseBoolean(_)).map(Boolean.box) + val value = properties.get(key).map(JBoolean.parseBoolean).map(Boolean.box) toJava(value) } @@ -253,10 +269,55 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** + * Returns a byte value under the given key if it exists. + */ + def getOptionalByte(key: String): Optional[JByte] = { + val value = properties.get(key).map(JByte.parseByte).map(Byte.box) + toJava(value) + } + + /** + * Returns a byte value under the given existing key. + */ + def getByte(key: String): Byte = { + getOptionalByte(key).orElseThrow(exceptionSupplier(key)) + } + + /** + * Returns a double value under the given key if it exists. + */ + def getOptionalDouble(key: String): Optional[JDouble] = { + val value = properties.get(key).map(JDouble.parseDouble).map(Double.box) + toJava(value) + } + + /** + * Returns a double value under the given key if it exists. + */ + def getDouble(key: String): Double = { + getOptionalDouble(key).orElseThrow(exceptionSupplier(key)) + } + + /** + * Returns a float value under the given key if it exists. + */ + def getOptionalFloat(key: String): Optional[JFloat] = { + val value = properties.get(key).map(JFloat.parseFloat).map(Float.box) + toJava(value) + } + + /** + * Returns a float value under the given key if it exists. + */ + def getFloat(key: String): Float = { + getOptionalFloat(key).orElseThrow(exceptionSupplier(key)) + } + + /** * Returns an integer value under the given key if it exists. */ def getOptionalInt(key: String): Optional[JInt] = { - val value = properties.get(key).map(JInt.parseInt(_)).map(Int.box) + val value = properties.get(key).map(JInt.parseInt).map(Int.box) toJava(value) } @@ -271,7 +332,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { * Returns a long value under the given key if it exists. */ def getOptionalLong(key: String): Optional[JLong] = { - val value = properties.get(key).map(JLong.parseLong(_)).map(Long.box) + val value = properties.get(key).map(JLong.parseLong).map(Long.box) toJava(value) } @@ -283,18 +344,18 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** - * Returns a double value under the given key if it exists. + * Returns a short value under the given key if it exists. */ - def getOptionalDouble(key: String): Optional[JDouble] = { - val value = properties.get(key).map(JDouble.parseDouble(_)).map(Double.box) + def getOptionalShort(key: String): Optional[JShort] = { + val value = properties.get(key).map(JShort.parseShort).map(Short.box) toJava(value) } /** - * Returns a double value under the given key if it exists. + * Returns a short value under the given existing key. */ - def getDouble(key: String): Double = { - getOptionalDouble(key).orElseThrow(exceptionSupplier(key)) + def getShort(key: String): Short = { + getOptionalShort(key).orElseThrow(exceptionSupplier(key)) } /** @@ -470,6 +531,16 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } /** + * Returns all properties under a group of array formed keys. + * + * E.g. constructor -> returns all constructor.# properties. + */ + def getListProperties(key: String): JMap[String, String] = { + val escapedKey = Pattern.quote(key) + properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava + } + + /** * Returns all properties under a given key that contains an index in between. * * E.g. rowtime.0.name -> returns all rowtime.#.name properties @@ -566,24 +637,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { min: Int, // inclusive max: Int) // inclusive : Unit = { - - if (!properties.contains(key)) { - if (!isOptional) { - throw new ValidationException(s"Could not find required property '$key'.") - } - } else { - try { - val value = Integer.parseInt(properties(key)) - if (value < min || value > max) { - throw new ValidationException(s"Property '$key' must be an integer value between $min " + - s"and $max but was: ${properties(key)}") - } - } catch { - case _: NumberFormatException => - throw new ValidationException( - s"Property '$key' must be an integer value but was: ${properties(key)}") - } - } + validateComparable(key, isOptional, new Integer(min), new Integer(max), Integer.valueOf) } /** @@ -616,24 +670,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { min: Long, // inclusive max: Long) // inclusive : Unit = { - - if (!properties.contains(key)) { - if (!isOptional) { - throw new ValidationException(s"Could not find required property '$key'.") - } - } else { - try { - val value = JLong.parseLong(properties(key)) - if (value < min || value > max) { - throw new ValidationException(s"Property '$key' must be a long value between $min " + - s"and $max but was: ${properties(key)}") - } - } catch { - case _: NumberFormatException => - throw new ValidationException( - s"Property '$key' must be a long value but was: ${properties(key)}") - } - } + validateComparable(key, isOptional, new JLong(min), new JLong(max), JLong.valueOf) } /** @@ -699,22 +736,165 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { min: Double, // inclusive max: Double) // inclusive : Unit = { + validateComparable(key, isOptional, new JDouble(min), new JDouble(max), JDouble.valueOf) + } + /** + * Validates a big decimal property. + */ + def validateBigDecimal( + key: String, + isOptional: Boolean): Unit = { + + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + try { + new JBigDecimal(properties(key)) + } catch { + case _: NumberFormatException => + throw new ValidationException( + s"Property '$key' must be a big decimal value but was: ${properties(key)}") + } + } + } + + /** + * Validates a big decimal property. The boundaries are inclusive. + */ + def validateBigDecimal( + key: String, + isOptional: Boolean, + min: BigDecimal, // inclusive + max: BigDecimal) // inclusive + : Unit = { + validateComparable( + key, + isOptional, + min.bigDecimal, + max.bigDecimal, + (value: String) => new JBigDecimal(value)) + } + + /** + * Validates a byte property. + */ + def validateByte( + key: String, + isOptional: Boolean): Unit = validateDouble(key, isOptional, Byte.MinValue, Byte.MaxValue) + + /** + * Validates a byte property. The boundaries are inclusive. + */ + def validateByte( + key: String, + isOptional: Boolean, + min: Byte) // inclusive + : Unit = { + validateByte(key, isOptional, min, Byte.MaxValue) + } + + /** + * Validates a byte property. The boundaries are inclusive. + */ + def validateByte( + key: String, + isOptional: Boolean, + min: Byte, // inclusive + max: Byte) // inclusive + : Unit = { + validateComparable(key, isOptional, new JByte(min), new JByte(max), JByte.valueOf) + } + + /** + * Validates a float property. + */ + def validateFloat( + key: String, + isOptional: Boolean): Unit = validateFloat(key, isOptional, Float.MinValue, Float.MaxValue) + + /** + * Validates a float property. The boundaries are inclusive. + */ + def validateFloat( + key: String, + isOptional: Boolean, + min: Float) // inclusive + : Unit = { + validateFloat(key, isOptional, min, Float.MaxValue) + } + + /** + * Validates a float property. The boundaries are inclusive. + */ + def validateFloat( + key: String, + isOptional: Boolean, + min: Float, // inclusive + max: Float) // inclusive + : Unit = { + validateComparable(key, isOptional, new JFloat(min), new JFloat(max), JFloat.valueOf) + } + + /** + * Validates a short property. + */ + def validateShort( + key: String, + isOptional: Boolean): Unit = validateFloat(key, isOptional, Short.MinValue, Short.MaxValue) + + /** + * Validates a short property. The boundaries are inclusive. + */ + def validateFloat( + key: String, + isOptional: Boolean, + min: Short) // inclusive + : Unit = { + validateShort(key, isOptional, min, Short.MaxValue) + } + + /** + * Validates a float property. The boundaries are inclusive. + */ + def validateShort( + key: String, + isOptional: Boolean, + min: Short, // inclusive + max: Short) // inclusive + : Unit = { + validateComparable(key, isOptional, new JShort(min), new JShort(max), JShort.valueOf) + } + + /** + * Validates a property by first parsing the string value to a comparable object. + * The boundaries are inclusive. + */ + private def validateComparable[T <: Comparable[T]]( + key: String, + isOptional: Boolean, + min: T, + max: T, + parseFunction: String => T) + : Unit = { if (!properties.contains(key)) { if (!isOptional) { throw new ValidationException(s"Could not find required property '$key'.") } } else { try { - val value = JDouble.parseDouble(properties(key)) - if (value < min || value > max) { - throw new ValidationException(s"Property '$key' must be a double value between $min " + - s"and $max but was: ${properties(key)}") + val value = parseFunction(properties(key)) + + if (value.compareTo(min) < 0 || value.compareTo(max) > 0) { + throw new ValidationException(s"Property '$key' must be a ${min.getClass.getSimpleName}" + + s" value between $min and $max but was: ${properties(key)}") } } catch { case _: NumberFormatException => throw new ValidationException( - s"Property '$key' must be an double value but was: ${properties(key)}") + s"Property '$key' must be a byte value but was: ${properties(key)}") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala new file mode 100644 index 0000000..f4c8363 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Descriptor for describing a function that can be instantiated from somewhere (e.g., a class). + * + * @param name name of the function + */ +class FunctionDescriptor(var name: String) extends Descriptor { + + var classDescriptor: Option[ClassType] = None + + /** + * Uses the class provided by the descriptor to instantiate the function. + */ + def using(classDescriptor: ClassType): FunctionDescriptor = { + this.classDescriptor = Option(classDescriptor) + this + } + + def getDescriptorProperties: DescriptorProperties = { + val descriptorProperties = new DescriptorProperties() + addProperties(descriptorProperties) + descriptorProperties + } + + override def addProperties(properties: DescriptorProperties): Unit = { + properties.putString(FunctionValidator.FUNCTION_NAME, name) + classDescriptor.foreach(_.addProperties(properties)) + } +} + +object FunctionDescriptor { + def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name) +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala new file mode 100644 index 0000000..f36b9c2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.functions.UserDefinedFunction + +import scala.collection.JavaConverters._ + +/** + * Validator for [[FunctionDescriptor]]. + */ +class FunctionValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = false, 1) + new ClassTypeValidator().validate(properties) + } +} + +object FunctionValidator { + + val FUNCTION_NAME = "name" + +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala new file mode 100644 index 0000000..f97c807 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A descriptor that may exist in an arbitrary level (be recursively included by other + * descriptors). + */ +abstract class HierarchyDescriptor extends Descriptor { + + /** + * Internal method for properties conversion. All the property keys will be prefixed according + * to the level. + */ + private[flink] def addPropertiesWithPrefix( + keyPrefix: String, properties: DescriptorProperties): Unit + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala new file mode 100644 index 0000000..73dd1f9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Validator for a [[HierarchyDescriptor]]. + */ +trait HierarchyDescriptorValidator extends DescriptorValidator{ + + def validate(properties: DescriptorProperties): Unit = { + validateWithPrefix("", properties) + } + + /** + * Performs validation with a prefix for the keys. + */ + def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala new file mode 100644 index 0000000..9e5fcab --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.typeutils.TypeStringUtils + +/** + * Descriptor for a primitive type. Use internally only. + */ +class PrimitiveType[T] extends HierarchyDescriptor { + + var typeInformation: TypeInformation[T] = _ + var value: T = _ + + def of(basicType: TypeInformation[T]): PrimitiveType[T] = { + typeInformation = basicType + this + } + + def value(value: T): PrimitiveType[T] = { + this.value = value + this + } + + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + addPropertiesWithPrefix("", properties) + } + + override private[flink] def addPropertiesWithPrefix( + keyPrefix: String, properties: DescriptorProperties): Unit = { + properties.putString(keyPrefix + "type", TypeStringUtils.writeTypeInfo(typeInformation)) + properties.putString(keyPrefix + "value", value.toString) + } +} + +object PrimitiveType { + def apply[T]() = new PrimitiveType[T]() +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala new file mode 100644 index 0000000..9b2f776 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort} +import java.math.{BigDecimal => JBigDecimal} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableException, Types} +import org.apache.flink.table.typeutils.TypeStringUtils + +/** + * Validator for [[PrimitiveType]]. + */ +class PrimitiveTypeValidator extends HierarchyDescriptorValidator { + + /* + * TODO The following types need to be supported next. + * Types.SQL_DATE + * Types.SQL_TIME + * Types.SQL_TIMESTAMP + * Types.INTERVAL_MONTHS + * Types.INTERVAL_MILLIS + * Types.PRIMITIVE_ARRAY + * Types.OBJECT_ARRAY + * Types.MAP + * Types.MULTISET + */ + + override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = { + val typeKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}" + val valueKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}" + + properties.validateType(typeKey, isOptional = false) + + val typeInfo: TypeInformation[_] = + properties.getType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}") + typeInfo match { + case Types.DECIMAL => properties.validateBigDecimal(valueKey, isOptional = false) + case Types.BOOLEAN => properties.validateBoolean(valueKey, isOptional = false) + case Types.BYTE => properties.validateByte(valueKey, isOptional = false) + case Types.DOUBLE => properties.validateDouble(valueKey, isOptional = false) + case Types.FLOAT => properties.validateFloat(valueKey, isOptional = false) + case Types.INT => properties.validateInt(valueKey, isOptional = false) + case Types.LONG => properties.validateLong(valueKey, isOptional = false) + case Types.SHORT => properties.validateShort(valueKey, isOptional = false) + case Types.STRING => properties.validateString(valueKey, isOptional = false) + case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.") + } + } +} + +object PrimitiveTypeValidator { + val PRIMITIVE_TYPE = "type" + val PRIMITIVE_VALUE = "value" + + private val LITERAL_FALSE = "false" + private val LITERAL_TRUE = "true" + + /** + * Derives the value according to the type and value strings. + * + * @param keyPrefix the prefix of the primitive type key + * @param properties the descriptor properties + * @return the derived value + */ + def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = { + val typeInfo = + properties.getType(s"$keyPrefix$PRIMITIVE_TYPE") + val valueKey = s"$keyPrefix$PRIMITIVE_VALUE" + val value = typeInfo match { + case Types.DECIMAL => properties.getBigDecimal(valueKey) + case Types.BOOLEAN => properties.getBoolean(valueKey) + case Types.BYTE => properties.getByte(valueKey) + case Types.DOUBLE => properties.getDouble(valueKey) + case Types.FLOAT => properties.getFloat(valueKey) + case Types.INT => properties.getInt(valueKey) + case Types.LONG => properties.getLong(valueKey) + case Types.SHORT => properties.getShort(valueKey) + case Types.STRING => properties.getString(valueKey) + case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.") + } + value + } + + /** + * Derives the actually value with the type information and string formatted value. + */ + def deriveTypeAndValueStr[T](typeInfo: TypeInformation[T], valueStr: String): T = { + typeInfo match { + case Types.DECIMAL => new JBigDecimal(valueStr).asInstanceOf[T] + case Types.BOOLEAN => JBoolean.parseBoolean(valueStr).asInstanceOf[T] + case Types.BYTE => JByte.parseByte(valueStr).asInstanceOf[T] + case Types.DOUBLE => JDouble.parseDouble(valueStr).asInstanceOf[T] + case Types.FLOAT => JFloat.parseFloat(valueStr).asInstanceOf[T] + case Types.INT => JInt.parseInt(valueStr).asInstanceOf[T] + case Types.LONG => JLong.parseLong(valueStr).asInstanceOf[T] + case Types.SHORT => JShort.parseShort(valueStr).asInstanceOf[T] + case Types.STRING => valueStr.asInstanceOf[T] + case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.") + } + } + +/** + * Tries to derive the type string from the given string value. + * The derive priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR. + * + * @param valueStr the string formatted value + * @return the type string of the given value + */ + def deriveTypeStrFromValueStr(valueStr: String): String = { + if (valueStr.equals(LITERAL_TRUE) || valueStr.equals(LITERAL_FALSE)) { + TypeStringUtils.BOOLEAN.key + } else { + try { + valueStr.toInt + TypeStringUtils.INT.key + } catch { + case _: NumberFormatException => + try { + valueStr.toDouble + TypeStringUtils.DOUBLE.key + } catch { + case _: NumberFormatException => + TypeStringUtils.STRING.key + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala new file mode 100644 index 0000000..9e97817 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors.service + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.descriptors.{ClassTypeValidator, DescriptorProperties, FunctionDescriptor, FunctionValidator, PrimitiveTypeValidator} +import org.apache.flink.table.functions.UserDefinedFunction + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ + +/** + * Utils that serve [[FunctionDescriptor]]. + */ +object FunctionService { + /** + * Generates a user-defined function with the given properties. + * + * @param properties the descriptor properties that belongs to a [[FunctionDescriptor]] + * @param classLoader the class loader to load the function and its parameter's classes + * @return the generated user-defined function + */ + def generateUserDefinedFunction( + properties: DescriptorProperties, + classLoader: ClassLoader): UserDefinedFunction = { + new FunctionValidator().validate(properties) + generateInstance[UserDefinedFunction]("", properties, classLoader) + } + + /** + * Recursively generate an instance of a class according the given properties. + * + * @param keyPrefix the prefix to fetch properties + * @param descriptorProps the descriptor properties that contains the class type information + * @param classLoader the class loader to load the class + * @tparam T type fo the generated instance + * @return an instance of the class + */ + def generateInstance[T]( + keyPrefix: String, + descriptorProps: DescriptorProperties, + classLoader: ClassLoader): T = { + val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}" + val constructorProps = + descriptorProps.getVariableIndexedProperties(constructorPrefix, List()) + var i = 0 + val typeValueList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer + while (i < constructorProps.size()) { + if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) { + val primitiveVal = PrimitiveTypeValidator + .derivePrimitiveValue(s"$constructorPrefix.$i.", descriptorProps) + typeValueList += ((primitiveVal.getClass, primitiveVal)) + } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) { + val typeValuePair = ( + Class.forName( + descriptorProps.getString(constructorProps(i).get(ClassTypeValidator.CLASS))), + generateInstance(s"$constructorPrefix.$i.", descriptorProps, classLoader)) + typeValueList += typeValuePair + } + i += 1 + } + val clazz = classLoader + .loadClass(descriptorProps.getString(s"$keyPrefix${ClassTypeValidator.CLASS}")) + val constructor = clazz.getConstructor(typeValueList.map(_._1): _*) + if (null == constructor) { + throw TableException(s"Cannot find a constructor with parameter types " + + s"${typeValueList.map(_._1)} for ${clazz.getName}") + } + constructor.newInstance(typeValueList.map(_._2.asInstanceOf[AnyRef]): _*).asInstanceOf[T] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala new file mode 100644 index 0000000..43710f6 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util.{List => JList, Map => JMap, Arrays => JArrays} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.api.ValidationException +import org.junit.Test + +import scala.collection.JavaConverters._ + +class ClassTypeTest extends DescriptorTestBase { + + @Test(expected = classOf[ValidationException]) + def testMissingClass(): Unit = { + removePropertyAndVerify(descriptors().get(0), ClassTypeValidator.CLASS) + } + + override def descriptors(): JList[Descriptor] = { + val desc1 = ClassType("class1") + .param(BasicTypeInfo.LONG_TYPE_INFO, "1") + .param( + ClassType("class2") + .param( + ClassType("class3") + .param("StarryNight") + .param( + ClassType("class4")))) + .param(2L) + + val desc2 = ClassType().of("class2") + + JArrays.asList(desc1, desc2) + } + + override def validator(): DescriptorValidator = { + new ClassTypeValidator() + } + + override def properties(): JList[JMap[String, String]] = { + val props1 = Map( + "class" -> "class1", + "constructor.0.type" -> "BIGINT", + "constructor.0.value" -> "1", + "constructor.1.class" -> "class2", + "constructor.1.constructor.0.class" -> "class3", + "constructor.1.constructor.0.constructor.0.type" -> "VARCHAR", + "constructor.1.constructor.0.constructor.0.value" -> "StarryNight", + "constructor.1.constructor.0.constructor.1.class" -> "class4", + "constructor.2.type" -> "BIGINT", + "constructor.2.value" -> "2" + ) + + val props2 = Map( + "class" -> "class2" + ) + + JArrays.asList(props1.asJava, props2.asJava) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala new file mode 100644 index 0000000..3b93ca25 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util.{List => JList, Map => JMap, Arrays => JArrays} + +import org.apache.flink.table.api.ValidationException +import org.junit.Test + +import scala.collection.JavaConverters._ + +class FunctionTest extends DescriptorTestBase { + + @Test(expected = classOf[ValidationException]) + def testMissingName(): Unit = { + removePropertyAndVerify(descriptors().get(0), "name") + } + + override def descriptors(): JList[Descriptor] = { + val desc1 = FunctionDescriptor("func1") + .using( + ClassType("another.class") + .of("my.class") + .param("INT", "1") + .param( + ClassType() + .of("my.class2") + .strParam("true"))) + + JArrays.asList(desc1) + } + + override def validator(): DescriptorValidator = { + new FunctionValidator() + } + + override def properties(): JList[JMap[String, String]] = { + val props1 = Map( + "name" -> "func1", + "class" -> "my.class", + "constructor.0.type" -> "INT", + "constructor.0.value" -> "1", + "constructor.1.class" -> "my.class2", + "constructor.1.constructor.0.type" -> "BOOLEAN", + "constructor.1.constructor.0.value" -> "true" + ) + JArrays.asList(props1.asJava) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala new file mode 100644 index 0000000..f684e2a --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util.{Arrays => JArrays, List => JList, Map => JMap} +import java.math.{BigDecimal => JBigDecimal} + +import org.apache.flink.table.api.{Types, ValidationException} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class PrimitiveTypeTest extends DescriptorTestBase { + + @Test(expected = classOf[ValidationException]) + def testMissingType(): Unit = { + removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_TYPE) + } + + @Test(expected = classOf[ValidationException]) + def testMissingValue(): Unit = { + removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_VALUE) + } + + override def descriptors(): JList[Descriptor] = { + val bigDecimalDesc = PrimitiveType().of(Types.DECIMAL).value(new JBigDecimal(1)) + val booleanDesc = PrimitiveType().of(Types.BOOLEAN).value(false) + val byteDesc = PrimitiveType().of(Types.BYTE).value(4.asInstanceOf[Byte]) + val doubleDesc = PrimitiveType().of(Types.DOUBLE).value(7.0) + val floatDesc = PrimitiveType().of(Types.FLOAT).value(8f) + val intDesc = PrimitiveType().of(Types.INT).value(9) + val longDesc = PrimitiveType().of(Types.LONG).value(10L) + val shortDesc = PrimitiveType().of(Types.SHORT).value(11.asInstanceOf[Short]) + val stringDesc = PrimitiveType().of(Types.STRING).value("12") + + JArrays.asList( + bigDecimalDesc, + booleanDesc, + byteDesc, + doubleDesc, + floatDesc, + intDesc, + longDesc, + shortDesc, + stringDesc) + } + + override def validator(): DescriptorValidator = { + new PrimitiveTypeValidator() + } + + override def properties(): JList[JMap[String, String]] = { + val bigDecimalProps = Map( + "type" -> "DECIMAL", + "value" -> "1" + ) + val booleanDesc = Map( + "type" -> "BOOLEAN", + "value" -> "false" + ) + val byteDesc = Map( + "type" -> "TINYINT", + "value" -> "4" + ) + + val doubleDesc = Map( + "type" -> "DOUBLE", + "value" -> "7.0" + ) + val floatDesc = Map( + "type" -> "FLOAT", + "value" -> "8.0" + ) + val intProps = Map( + "type" -> "INT", + "value" -> "9" + ) + val longDesc = Map( + "type" -> "BIGINT", + "value" -> "10" + ) + val shortDesc = Map( + "type" -> "SMALLINT", + "value" -> "11" + ) + val stringDesc = Map( + "type" -> "VARCHAR", + "value" -> "12" + ) + JArrays.asList( + bigDecimalProps.asJava, + booleanDesc.asJava, + byteDesc.asJava, + doubleDesc.asJava, + floatDesc.asJava, + intProps.asJava, + longDesc.asJava, + shortDesc.asJava, + stringDesc.asJava) + } +}