This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f1dce  [FLINK-13266][table] Port function-related descriptors to 
flink-table-common
24f1dce is described below

commit 24f1dce1ceca35ef0177fa16648fd40cbcb52ded
Author: godfreyhe <[email protected]>
AuthorDate: Thu Jul 18 19:26:36 2019 +0800

    [FLINK-13266][table] Port function-related descriptors to flink-table-common
    
    FileSystem/OldCsv/RowtimeValidator/SchemaValidator will be ported in other 
commits
---
 .../flink/table/descriptors/ClassInstance.java     | 232 +++++++++++++++++++
 .../table/descriptors/ClassInstanceValidator.java  |  70 ++++++
 .../table/descriptors/FunctionDescriptor.java      |  56 +++++
 .../descriptors/FunctionDescriptorValidator.java   |  49 +++++
 .../table/descriptors/HierarchyDescriptor.java}    |  27 ++-
 .../descriptors/HierarchyDescriptorValidator.java} |  40 ++--
 .../flink/table/descriptors/LiteralValue.java      | 187 ++++++++++++++++
 .../table/descriptors/LiteralValueValidator.java   | 162 ++++++++++++++
 .../flink/table/descriptors/ClassInstance.scala    | 245 ---------------------
 .../table/descriptors/ClassInstanceValidator.scala |  59 -----
 .../table/descriptors/FunctionDescriptor.scala     |  62 ------
 .../descriptors/FunctionDescriptorValidator.scala  |  57 -----
 .../flink/table/descriptors/LiteralValue.scala     | 228 -------------------
 .../table/descriptors/LiteralValueValidator.scala  | 145 ------------
 .../table/descriptors/ClassInstanceTest.scala      |  12 +-
 .../table/descriptors/FunctionDescriptorTest.scala |   6 +-
 .../flink/table/descriptors/LiteralValueTest.scala |  18 +-
 .../table/functions/FunctionServiceTest.scala      |  30 +--
 18 files changed, 828 insertions(+), 857 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
new file mode 100644
index 0000000..573157d
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Either;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.types.Either.Left;
+import static org.apache.flink.types.Either.Right;
+
+/**
+ * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
+ * with a public constructor (with or without parameters).
+ */
+@PublicEvolving
+public class ClassInstance extends HierarchyDescriptor {
+
+       private String className;
+       // the parameter is either a literal value or the instance of a class
+       private List<Either<LiteralValue, ClassInstance>> constructor = new 
ArrayList<>();
+
+       /**
+        * Sets the fully qualified class name for creating an instance.
+        *
+        * <p>E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
+        *
+        * @param className fully qualified class name
+        */
+       public ClassInstance of(String className) {
+               this.className = className;
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of literal type. The type is 
automatically derived from
+        * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, 
and VARCHAR. Expression
+        * values are not allowed.
+        *
+        * <p>Examples:
+        * - "true", "false" -> BOOLEAN
+        * - "42", "-5" -> INT
+        * - "2.0", "1234.222" -> DOUBLE
+        * - VARCHAR otherwise
+        *
+        * <p>For other types and explicit type declaration use {@link 
#parameter(String, String)} or
+        * {@link #parameter(TypeInformation, String)}.
+        */
+       public ClassInstance parameterString(String valueString) {
+               constructor.add(Left(new LiteralValue().value(valueString)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of literal type. The type is 
explicitly defined using a
+        * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The 
value is parsed
+        * accordingly. Expression values are not allowed.
+        *
+        * @param typeString the type string that define how to parse the given 
value string
+        * @param valueString the literal value to be parsed
+        */
+       public ClassInstance parameter(String typeString, String valueString) {
+               constructor.add(Left(new 
LiteralValue().of(typeString).value(valueString)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of literal type. The type is 
explicitly defined using
+        * type information. The value is parsed accordingly. Expression values 
are not allowed.
+        *
+        * @param typeInfo the type that define how to parse the given value 
string
+        * @param valueString the literal value to be parsed
+        */
+       public ClassInstance parameter(TypeInformation<?> typeInfo, String 
valueString) {
+               constructor.add(Left(new 
LiteralValue().of(typeInfo).value(valueString)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of BOOLEAN type.
+        *
+        * @param value BOOLEAN value
+        */
+       public ClassInstance parameter(boolean value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.BOOLEAN).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of DOUBLE type.
+        *
+        * @param value DOUBLE value
+        */
+       public ClassInstance parameter(double value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.DOUBLE).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of FLOAT type.
+        *
+        * @param value FLOAT value
+        */
+       public ClassInstance parameter(float value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.FLOAT).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of INT type.
+        *
+        * @param value INT value
+        */
+       public ClassInstance parameter(int value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.INT).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of VARCHAR type.
+        *
+        * @param value VARCHAR value
+        */
+       public ClassInstance parameter(String value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.STRING).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of BIGINT type.
+        *
+        * @param value BIGINT value
+        */
+       public ClassInstance parameter(long value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.LONG).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of TINYINT type.
+        *
+        * @param value TINYINT value
+        */
+       public ClassInstance parameter(byte value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.BYTE).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of SMALLINT type.
+        *
+        * @param value SMALLINT value
+        */
+       public ClassInstance parameter(short value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.SHORT).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of DECIMAL type.
+        *
+        * @param value DECIMAL value
+        */
+       public ClassInstance parameter(BigDecimal value) {
+               constructor.add(Left(new 
LiteralValue().of(Types.BIG_DEC).value(value)));
+               return this;
+       }
+
+       /**
+        * Adds a constructor parameter value of a class instance (i.e. a Java 
object with a public
+        * constructor).
+        *
+        * @param classInstance description of a class instance (i.e. a Java 
object with a public
+        * constructor).
+        */
+       public ClassInstance parameter(ClassInstance classInstance) {
+               constructor.add(Right(classInstance));
+               return this;
+       }
+
+       /**
+        * Converts this descriptor into a set of properties.
+        */
+       @Override
+       public Map<String, String> toProperties() {
+               DescriptorProperties properties = new DescriptorProperties();
+               
addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties);
+               return properties.asMap();
+       }
+
+       /**
+        * Internal method for properties conversion.
+        */
+       @Override
+       public void addPropertiesWithPrefix(String keyPrefix, 
DescriptorProperties properties) {
+               if (className != null) {
+                       properties.putString(keyPrefix + 
ClassInstanceValidator.CLASS, className);
+               }
+               for (int i = 0; i < constructor.size(); ++i) {
+                       Either<LiteralValue, ClassInstance> either = 
constructor.get(i);
+                       String keyPrefixWithIdx = keyPrefix + 
ClassInstanceValidator.CONSTRUCTOR + "." + i + ".";
+                       if (either.isLeft()) {
+                               
either.left().addPropertiesWithPrefix(keyPrefixWithIdx, properties);
+                       } else {
+                               
either.right().addPropertiesWithPrefix(keyPrefixWithIdx, properties);
+                       }
+               }
+       }
+
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
new file mode 100644
index 0000000..a5abebb
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Validator for {@link ClassInstance}.
+ */
+@Internal
+public class ClassInstanceValidator extends HierarchyDescriptorValidator {
+
+       public static final String CLASS = "class";
+       public static final String CONSTRUCTOR = "constructor";
+
+       /**
+        * @param keyPrefix prefix to be added to every property before 
validation
+        */
+       public ClassInstanceValidator(String keyPrefix) {
+               super(keyPrefix);
+       }
+
+       public ClassInstanceValidator() {
+               this(EMPTY_PREFIX);
+       }
+
+       @Override
+       protected void validateWithPrefix(String keyPrefix, 
DescriptorProperties properties) {
+               // check class name
+               properties.validateString(keyPrefix + CLASS, false, 1);
+
+               // check constructor
+               String constructorPrefix = keyPrefix + CONSTRUCTOR;
+
+               List<Map<String, String>> constructorProperties =
+                               
properties.getVariableIndexedProperties(constructorPrefix, new ArrayList<>());
+               for (int i = 0; i < constructorProperties.size(); ++i) {
+                       String keyPrefixWithIdx = constructorPrefix + "." + i + 
".";
+                       if 
(constructorProperties.get(i).containsKey(ClassInstanceValidator.CLASS)) {
+                               ClassInstanceValidator classInstanceValidator = 
new ClassInstanceValidator(keyPrefixWithIdx);
+                               classInstanceValidator.validate(properties);
+                       }
+                       // literal value
+                       else {
+                               LiteralValueValidator primitiveValueValidator = 
new LiteralValueValidator(keyPrefixWithIdx);
+                               primitiveValueValidator.validate(properties);
+                       }
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
new file mode 100644
index 0000000..72e1068
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * Descriptor for describing a function.
+ */
+@PublicEvolving
+public class FunctionDescriptor implements Descriptor {
+       private String from;
+       private ClassInstance classInstance;
+
+       /**
+        * Creates a function from a class description.
+        */
+       public FunctionDescriptor fromClass(ClassInstance classType) {
+               from = FunctionDescriptorValidator.FROM_VALUE_CLASS;
+               this.classInstance = classType;
+               return this;
+       }
+
+       /**
+        * Converts this descriptor into a set of properties.
+        */
+       @Override
+       public Map<String, String> toProperties() {
+               DescriptorProperties properties = new DescriptorProperties();
+               if (from != null) {
+                       properties.putString(FunctionDescriptorValidator.FROM, 
from);
+               }
+               if (classInstance != null) {
+                       properties.putProperties(classInstance.toProperties());
+               }
+               return properties.asMap();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
new file mode 100644
index 0000000..059be19
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Validator for {@link FunctionDescriptor}.
+ */
+@Internal
+public class FunctionDescriptorValidator implements DescriptorValidator {
+
+       public static final String FROM = "from";
+       public static final String FROM_VALUE_CLASS = "class";
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               Map<String, Consumer<String>> enumValidation = new HashMap<>();
+               enumValidation.put(FROM_VALUE_CLASS, s -> new 
ClassInstanceValidator().validate(properties));
+
+               // check for 'from'
+               if (properties.containsKey(FROM)) {
+                       properties.validateEnum(FROM, false, enumValidation);
+               } else {
+                       throw new ValidationException("Could not find 'from' 
property for function.");
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
similarity index 61%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
index b958b1c..a8790d7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
@@ -16,20 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
-  * A descriptor that may exist in an arbitrary level (be recursively included 
by other
-  * descriptors).
-  */
-abstract class HierarchyDescriptor extends Descriptor {
+ * A descriptor that may exist in an arbitrary level (be recursively included 
by other
+ * descriptors).
+ */
+@PublicEvolving
+public abstract class HierarchyDescriptor implements Descriptor {
 
-  /**
-    * Internal method for properties conversion. All the property keys will be 
prefixed with the
-    * given key prefix.
-    */
-  private[flink] def addPropertiesWithPrefix(
-    keyPrefix: String,
-    properties: DescriptorProperties): Unit
+       /**
+        * Internal method for properties conversion. All the property keys 
will be prefixed with the
+        * given key prefix.
+        */
+       public abstract void addPropertiesWithPrefix(
+                       String keyPrefix,
+                       DescriptorProperties properties);
 
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
similarity index 52%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
index d7bf573..c52cebf 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
@@ -16,26 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
 
 /**
-  * Validator for a [[HierarchyDescriptor]].
-  *
-  * @param keyPrefix prefix to be added to every property before validation
-  */
-abstract class HierarchyDescriptorValidator(keyPrefix: String) extends 
DescriptorValidator{
+ * Validator for a {@link HierarchyDescriptor}.
+ */
+@Internal
+public abstract class HierarchyDescriptorValidator implements 
DescriptorValidator {
 
-  final def validate(properties: DescriptorProperties): Unit = {
-    validateWithPrefix(keyPrefix, properties)
-  }
+       public static final String EMPTY_PREFIX = "";
 
-  /**
-    * Performs validation with a prefix for the keys.
-    */
-  protected def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit
+       private String keyPrefix;
 
-}
+       /**
+        * @param keyPrefix prefix to be added to every property before 
validation
+        */
+       public HierarchyDescriptorValidator(String keyPrefix) {
+               this.keyPrefix = keyPrefix;
+       }
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               validateWithPrefix(keyPrefix, properties);
+       }
 
-object HierarchyDescriptorValidator {
-  val EMPTY_PREFIX = ""
+       /**
+        * Performs validation with a prefix for the keys.
+        */
+       protected abstract void validateWithPrefix(String keyPrefix, 
DescriptorProperties properties);
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
new file mode 100644
index 0000000..8801169
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.TypeStringUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+/**
+ * Descriptor for a literal value. A literal value consists of a type and the 
actual value.
+ * Expression values are not allowed.
+ *
+ * <p>If no type is set, the type is automatically derived from the value. 
Currently,
+ * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+ *
+ * <p>Examples:
+ * - "true", "false" -> BOOLEAN
+ * - "42", "-5" -> INT
+ * - "2.0", "1234.222" -> DOUBLE
+ * - VARCHAR otherwise
+ */
+@PublicEvolving
+public class LiteralValue extends HierarchyDescriptor {
+       private String typeInfo;
+       private Object value;
+
+       /**
+        * Type information of the literal value. E.g. Types.BOOLEAN.
+        *
+        * @param typeInfo type information describing the value
+        */
+       public LiteralValue of(TypeInformation<?> typeInfo) {
+               Preconditions.checkNotNull(typeInfo, "Type information must not 
be null.");
+               this.typeInfo = TypeStringUtils.writeTypeInfo(typeInfo);
+               return this;
+       }
+
+       /**
+        * Type string of the literal value. E.g. "BOOLEAN".
+        *
+        * @param typeString type string describing the value
+        */
+       public LiteralValue of(String typeString) {
+               this.typeInfo = typeString;
+               return this;
+       }
+
+       /**
+        * Literal BOOLEAN value.
+        *
+        * @param value literal BOOLEAN value
+        */
+       public LiteralValue value(boolean value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal INT value.
+        *
+        * @param value literal INT value
+        */
+       public LiteralValue value(int value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal DOUBLE value.
+        *
+        * @param value literal DOUBLE value
+        */
+       public LiteralValue value(double value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal FLOAT value.
+        *
+        * @param value literal FLOAT value
+        */
+       public LiteralValue value(float value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal value either for an explicit VARCHAR type or automatically 
derived type.
+        *
+        * <p>If no type is set, the type is automatically derived from the 
value. Currently,
+        * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+        *
+        * @param value literal value
+        */
+       public LiteralValue value(String value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal BIGINT value.
+        *
+        * @param value literal BIGINT value
+        */
+       public LiteralValue value(long value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal TINYINT value.
+        *
+        * @param value literal TINYINT value
+        */
+       public LiteralValue value(byte value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal SMALLINT value.
+        *
+        * @param value literal SMALLINT value
+        */
+       public LiteralValue value(short value) {
+               this.value = value;
+               return this;
+       }
+
+       /**
+        * Literal DECIMAL value.
+        *
+        * @param value literal DECIMAL value
+        */
+       public LiteralValue value(BigDecimal value) {
+               this.value = value;
+               return this;
+       }
+
+       @Override
+       public Map<String, String> toProperties() {
+               DescriptorProperties properties = new DescriptorProperties();
+               
addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties);
+               return properties.asMap();
+       }
+
+       @Override
+       public void addPropertiesWithPrefix(String keyPrefix, 
DescriptorProperties properties) {
+               if (typeInfo != null) {
+                       properties.putString(keyPrefix + "type", typeInfo);
+                       if (value != null) {
+                               properties.putString(keyPrefix + "value", 
String.valueOf(value));
+                       }
+               } else {
+                       // do not allow values in top-level
+                       if 
(keyPrefix.equals(HierarchyDescriptorValidator.EMPTY_PREFIX)) {
+                               throw new ValidationException(
+                                               "Literal values with implicit 
type must not exist in the top level of a hierarchy.");
+                       }
+                       if (value != null) {
+                               properties.putString(keyPrefix.substring(0, 
keyPrefix.length() - 1), String.valueOf(value));
+                       }
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
new file mode 100644
index 0000000..ffd6503
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+
+/**
+ * Validator for {@link LiteralValue}.
+ */
+@Internal
+public class LiteralValueValidator extends HierarchyDescriptorValidator {
+
+       public static final String TYPE = "type";
+       public static final String VALUE = "value";
+
+       /*
+        * TODO The following types need to be supported next.
+        * Types.SQL_DATE
+        * Types.SQL_TIME
+        * Types.SQL_TIMESTAMP
+        * Types.PRIMITIVE_ARRAY
+        * Types.OBJECT_ARRAY
+        * Types.MAP
+        * Types.MULTISET
+        * null
+        */
+
+       /**
+        * Gets the value according to the type and value strings.
+        *
+        * @param keyPrefix the prefix of the literal type key
+        * @param properties the descriptor properties
+        * @return the derived value
+        */
+       public static Object getValue(String keyPrefix, DescriptorProperties 
properties) {
+               String typeKey = keyPrefix + TYPE;
+               // explicit type
+               if (properties.containsKey(typeKey)) {
+                       String valueKey = keyPrefix + VALUE;
+                       TypeInformation<?> typeInfo = 
properties.getType(typeKey);
+
+                       if (typeInfo.equals(Types.BIG_DEC)) {
+                               return properties.getBigDecimal(valueKey);
+                       } else if (typeInfo.equals(Types.BOOLEAN)) {
+                               return properties.getBoolean(valueKey);
+                       } else if (typeInfo.equals(Types.BYTE)) {
+                               return properties.getByte(valueKey);
+                       } else if (typeInfo.equals(Types.DOUBLE)) {
+                               return properties.getDouble(valueKey);
+                       } else if (typeInfo.equals(Types.FLOAT)) {
+                               return properties.getFloat(valueKey);
+                       } else if (typeInfo.equals(Types.INT)) {
+                               return properties.getInt(valueKey);
+                       } else if (typeInfo.equals(Types.LONG)) {
+                               return properties.getLong(valueKey);
+                       } else if (typeInfo.equals(Types.SHORT)) {
+                               return properties.getShort(valueKey);
+                       } else if (typeInfo.equals(Types.STRING)) {
+                               return properties.getString(valueKey);
+                       } else {
+                               throw new TableException("Unsupported type '" + 
typeInfo.getTypeClass() + "'.");
+                       }
+               }
+               // implicit type
+               else {
+                       return deriveTypeStringFromValueString(
+                                       
properties.getString(keyPrefix.substring(0, keyPrefix.length() - 1)));
+               }
+       }
+
+       /**
+        * Tries to derive a literal value from the given string value.
+        * The derivation priority for the types are BOOLEAN, INT, DOUBLE, and 
VARCHAR.
+        *
+        * @param valueString the string formatted value
+        * @return parsed value
+        */
+       private static Object deriveTypeStringFromValueString(String 
valueString) {
+               if (valueString.equals("true") || valueString.equals("false")) {
+                       return Boolean.valueOf(valueString);
+               } else {
+                       try {
+                               return Integer.valueOf(valueString);
+                       } catch (NumberFormatException e1) {
+                               try {
+                                       return Double.valueOf(valueString);
+                               } catch (NumberFormatException e2) {
+                                       return valueString;
+                               }
+                       }
+               }
+       }
+
+       /**
+        * @param keyPrefix prefix to be added to every property before 
validation
+        */
+       public LiteralValueValidator(String keyPrefix) {
+               super(keyPrefix);
+       }
+
+       @Override
+       protected void validateWithPrefix(String keyPrefix, 
DescriptorProperties properties) {
+               String typeKey = keyPrefix + TYPE;
+               properties.validateType(typeKey, true, false);
+
+               // explicit type
+               if (properties.containsKey(typeKey)) {
+                       String valueKey = keyPrefix + VALUE;
+                       TypeInformation<?> typeInfo = 
properties.getType(typeKey);
+                       if (typeInfo.equals(Types.BIG_DEC)) {
+                               properties.validateBigDecimal(valueKey, false);
+                       } else if (typeInfo.equals(Types.BOOLEAN)) {
+                               properties.validateBoolean(valueKey, false);
+                       } else if (typeInfo.equals(Types.BYTE)) {
+                               properties.validateByte(valueKey, false);
+                       } else if (typeInfo.equals(Types.DOUBLE)) {
+                               properties.validateDouble(valueKey, false);
+                       } else if (typeInfo.equals(Types.FLOAT)) {
+                               properties.validateFloat(valueKey, false);
+                       } else if (typeInfo.equals(Types.INT)) {
+                               properties.validateInt(valueKey, false);
+                       } else if (typeInfo.equals(Types.LONG)) {
+                               properties.validateLong(valueKey, false);
+                       } else if (typeInfo.equals(Types.SHORT)) {
+                               properties.validateShort(valueKey, false);
+                       } else if (typeInfo.equals(Types.STRING)) {
+                               properties.validateString(valueKey, false);
+                       } else {
+                               throw new TableException("Unsupported type '" + 
typeInfo + "'.");
+                       }
+               }
+               // implicit type
+               else {
+                       // do not allow values in top-level
+                       if 
(keyPrefix.equals(HierarchyDescriptorValidator.EMPTY_PREFIX)) {
+                               throw new ValidationException(
+                                               "Literal values with implicit 
type must not exist in the top level of a hierarchy.");
+                       }
+                       properties.validateString(keyPrefix.substring(0, 
keyPrefix.length() - 1), false);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
deleted file mode 100644
index 34ed6dd..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.Types
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
-  * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
-  * with a public constructor (with or without parameters).
-  */
-class ClassInstance extends HierarchyDescriptor {
-
-  private var className: Option[String] = None
-
-  // the parameter is either a literal value or the instance of a class
-  private val constructor: ArrayBuffer[Either[LiteralValue, ClassInstance]] = 
ArrayBuffer()
-
-  /**
-    * Sets the fully qualified class name for creating an instance.
-    *
-    * E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
-    *
-    * @param className fully qualified class name
-    */
-  def of(className: String): ClassInstance = {
-    this.className = Option(className)
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of literal type. The type is 
automatically derived from
-    * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and 
VARCHAR. Expression
-    * values are not allowed.
-    *
-    * Examples:
-    *   - "true", "false" -> BOOLEAN
-    *   - "42", "-5" -> INT
-    *   - "2.0", "1234.222" -> DOUBLE
-    *   - VARCHAR otherwise
-    *
-    * For other types and explicit type declaration use [[parameter(String, 
String)]] or
-    * [[parameter(TypeInformation, String)]].
-    *
-    */
-  def parameterString(valueString: String): ClassInstance = {
-    constructor += Left(new LiteralValue().value(valueString))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of literal type. The type is 
explicitly defined using a
-    * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value 
is parsed
-    * accordingly. Expression values are not allowed.
-    *
-    * @param typeString the type string that define how to parse the given 
value string
-    * @param valueString the literal value to be parsed
-    */
-  def parameter(typeString: String, valueString: String): ClassInstance = {
-    constructor += Left(new LiteralValue().of(typeString).value(valueString))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of literal type. The type is 
explicitly defined using
-    * type information. The value is parsed accordingly. Expression values are 
not allowed.
-    *
-    * @param typeInfo the type that define how to parse the given value string
-    * @param valueString the literal value to be parsed
-    */
-  def parameter(typeInfo: TypeInformation[_], valueString: String): 
ClassInstance = {
-    constructor += Left(new LiteralValue().of(typeInfo).value(valueString))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of BOOLEAN type.
-    *
-    * @param value BOOLEAN value
-    */
-  def parameter(value: Boolean): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.BOOLEAN).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of DOUBLE type.
-    *
-    * @param value DOUBLE value
-    */
-  def parameter(value: Double): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.DOUBLE).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of FLOAT type.
-    *
-    * @param value FLOAT value
-    */
-  def parameter(value: Float): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.FLOAT).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of INT type.
-    *
-    * @param value INT value
-    */
-  def parameter(value: Int): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.INT).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of VARCHAR type.
-    *
-    * @param value VARCHAR value
-    */
-  def parameter(value: String): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.STRING).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of BIGINT type.
-    *
-    * @param value BIGINT value
-    */
-  def parameter(value: Long): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.LONG).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of TINYINT type.
-    *
-    * @param value TINYINT value
-    */
-  def parameter(value: Byte): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.BYTE).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of SMALLINT type.
-    *
-    * @param value SMALLINT value
-    */
-  def parameter(value: Short): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.SHORT).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of DECIMAL type.
-    *
-    * @param value DECIMAL value
-    */
-  def parameter(value: java.math.BigDecimal): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.DECIMAL).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of a class instance (i.e. a Java 
object with a public
-    * constructor).
-    *
-    * @param classInstance description of a class instance (i.e. a Java object 
with a public
-    *                      constructor).
-    */
-  def parameter(classInstance: ClassInstance): ClassInstance = {
-    constructor += Right(classInstance)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, 
properties)
-    properties.asMap()
-  }
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addPropertiesWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties): Unit = {
-    
className.foreach(properties.putString(s"$keyPrefix${ClassInstanceValidator.CLASS}",
 _))
-    var i = 0
-    while (i < constructor.size) {
-      constructor(i) match {
-        case Left(literalValue) =>
-          literalValue.addPropertiesWithPrefix(
-            s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
-            properties)
-        case Right(classInstance) =>
-          classInstance.addPropertiesWithPrefix(
-            s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
-            properties)
-      }
-      i += 1
-    }
-  }
-}
-
-/**
-  * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
-  * with a public constructor (with or without parameters).
-  */
-object ClassInstance {
-
-  /**
-    * Descriptor for a class instance. A class instance is a Java/Scala object 
created from a class
-    * with a public constructor (with or without parameters).
-    *
-    * @deprecated Use `new ClassInstance()`.
-    */
-  @deprecated
-  def apply() = new ClassInstance
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
deleted file mode 100644
index 43da1e1..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import 
org.apache.flink.table.descriptors.HierarchyDescriptorValidator.EMPTY_PREFIX
-
-import scala.collection.JavaConversions._
-
-/**
-  * Validator for [[ClassInstance]].
-  */
-class ClassInstanceValidator(keyPrefix: String = EMPTY_PREFIX)
-  extends HierarchyDescriptorValidator(keyPrefix) {
-
-  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
-    // check class name
-    properties.validateString(s"$keyPrefix${ClassInstanceValidator.CLASS}", 
false, 1)
-
-    // check constructor
-    val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
-
-    val constructorProperties = 
properties.getVariableIndexedProperties(constructorPrefix, List())
-    var i = 0
-    while (i < constructorProperties.size()) {
-      // nested class instance
-      if (constructorProperties(i).containsKey(ClassInstanceValidator.CLASS)) {
-        val classInstanceValidator = new 
ClassInstanceValidator(s"$constructorPrefix.$i.")
-        classInstanceValidator.validate(properties)
-      }
-      // literal value
-      else {
-        val primitiveValueValidator = new 
LiteralValueValidator(s"$constructorPrefix.$i.")
-        primitiveValueValidator.validate(properties)
-      }
-      i += 1
-    }
-  }
-}
-
-object ClassInstanceValidator {
-  val CLASS = "class"
-  val CONSTRUCTOR = "constructor"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
deleted file mode 100644
index 59aef11..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-import java.util
-
-/**
-  * Descriptor for describing a function.
-  */
-class FunctionDescriptor extends Descriptor {
-
-  private var from: Option[String] = None
-  private var classInstance: Option[ClassInstance] = None
-
-  /**
-    * Creates a function from a class description.
-    */
-  def fromClass(classType: ClassInstance): FunctionDescriptor = {
-    from = Some(FunctionDescriptorValidator.FROM_VALUE_CLASS)
-    this.classInstance = Option(classType)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    from.foreach(properties.putString(FunctionDescriptorValidator.FROM, _))
-    classInstance.foreach(d => properties.putProperties(d.toProperties))
-    properties.asMap()
-  }
-}
-
-/**
-  * Descriptor for describing a function.
-  */
-object FunctionDescriptor {
-
-  /**
-    * Descriptor for describing a function.
-    *
-    * @deprecated Use `new FunctionDescriptor()`.
-    */
-  @deprecated
-  def apply(): FunctionDescriptor = new FunctionDescriptor()
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
deleted file mode 100644
index db8a1b5..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.FunctionDescriptorValidator.FROM
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-
-import scala.collection.JavaConverters._
-
-/**
-  * Validator for [[FunctionDescriptor]].
-  */
-class FunctionDescriptorValidator extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-
-    val classValidation = (_: String) => {
-      new ClassInstanceValidator().validate(properties)
-    }
-
-    // check for 'from'
-    if (properties.containsKey(FROM)) {
-      properties.validateEnum(
-        FROM,
-        false,
-        Map(
-          FunctionDescriptorValidator.FROM_VALUE_CLASS -> 
toJava(classValidation)
-        ).asJava
-      )
-    } else {
-      throw new ValidationException("Could not find 'from' property for 
function.")
-    }
-  }
-}
-
-object FunctionDescriptorValidator {
-
-  val FROM = "from"
-  val FROM_VALUE_CLASS = "class"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
deleted file mode 100644
index 62bd6f8..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.utils.TypeStringUtils
-import org.apache.flink.util.Preconditions
-
-/**
-  * Descriptor for a literal value. A literal value consists of a type and the 
actual value.
-  * Expression values are not allowed.
-  *
-  * If no type is set, the type is automatically derived from the value. 
Currently,
-  * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-  *
-  * Examples:
-  *   - "true", "false" -> BOOLEAN
-  *   - "42", "-5" -> INT
-  *   - "2.0", "1234.222" -> DOUBLE
-  *   - VARCHAR otherwise
-  */
-class LiteralValue extends HierarchyDescriptor {
-
-  var typeInfo: Option[String] = None
-  var value: Option[Any] = None
-
-  /**
-    * Type information of the literal value. E.g. Types.BOOLEAN.
-    *
-    * @param typeInfo type information describing the value
-    */
-  def of(typeInfo: TypeInformation[_]): LiteralValue = {
-    Preconditions.checkNotNull("Type information must not be null.")
-    this.typeInfo = Option(TypeStringUtils.writeTypeInfo(typeInfo))
-    this
-  }
-
-  /**
-    * Type string of the literal value. E.g. "BOOLEAN".
-    *
-    * @param typeString type string describing the value
-    */
-  def of(typeString: String): LiteralValue = {
-    this.typeInfo = Option(typeString)
-    this
-  }
-
-  /**
-    * Literal BOOLEAN value.
-    *
-    * @param value literal BOOLEAN value
-    */
-  def value(value: Boolean): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal INT value.
-    *
-    * @param value literal INT value
-    */
-  def value(value: Int): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal DOUBLE value.
-    *
-    * @param value literal DOUBLE value
-    */
-  def value(value: Double): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal FLOAT value.
-    *
-    * @param value literal FLOAT value
-    */
-  def value(value: Float): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal value either for an explicit VARCHAR type or automatically 
derived type.
-    *
-    * If no type is set, the type is automatically derived from the value. 
Currently,
-    * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-    *
-    * @param value literal value
-    */
-  def value(value: String): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal BIGINT value.
-    *
-    * @param value literal BIGINT value
-    */
-  def value(value: Long): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal TINYINT value.
-    *
-    * @param value literal TINYINT value
-    */
-  def value(value: Byte): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal SMALLINT value.
-    *
-    * @param value literal SMALLINT value
-    */
-  def value(value: Short): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal DECIMAL value.
-    *
-    * @param value literal DECIMAL value
-    */
-  def value(value: java.math.BigDecimal): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, 
properties)
-    properties.asMap()
-  }
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addPropertiesWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties)
-    : Unit = {
-
-    typeInfo match {
-      // explicit type
-      case Some(ti) =>
-        properties.putString(keyPrefix + "type", ti)
-        value.foreach(v => properties.putString(keyPrefix + "value", 
String.valueOf(v)))
-      // implicit type
-      case None =>
-        // do not allow values in top-level
-        if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
-          throw new ValidationException(
-            "Literal values with implicit type must not exist in the top level 
of a hierarchy.")
-        }
-        value.foreach { v =>
-          properties.putString(keyPrefix.substring(0, keyPrefix.length - 1), 
String.valueOf(v))
-        }
-    }
-  }
-}
-
-/**
-  * Descriptor for a literal value. A literal value consists of a type and the 
actual value.
-  * Expression values are not allowed.
-  *
-  * If no type is set, the type is automatically derived from the value. 
Currently,
-  * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-  *
-  * Examples:
-  *   - "true", "false" -> BOOLEAN
-  *   - "42", "-5" -> INT
-  *   - "2.0", "1234.222" -> DOUBLE
-  *   - VARCHAR otherwise
-  */
-object LiteralValue {
-
-  /**
-    * Descriptor for a literal value. A literal value consists of a type and 
the actual value.
-    * Expression values are not allowed.
-    *
-    * If no type is set, the type is automatically derived from the value. 
Currently,
-    * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-    *
-    * Examples:
-    *   - "true", "false" -> BOOLEAN
-    *   - "42", "-5" -> INT
-    *   - "2.0", "1234.222" -> DOUBLE
-    *   - VARCHAR otherwise
-    *
-    *   @deprecated Use `new Literal()`.
-    */
-  @deprecated
-  def apply() = new LiteralValue()
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
deleted file mode 100644
index 4e85858..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt}
-
-import org.apache.flink.api.common.typeinfo.Types
-import org.apache.flink.table.api.{TableException, ValidationException}
-
-/**
-  * Validator for [[LiteralValue]].
-  */
-class LiteralValueValidator(keyPrefix: String) extends 
HierarchyDescriptorValidator(keyPrefix) {
-
-  /*
-   * TODO The following types need to be supported next.
-   * Types.SQL_DATE
-   * Types.SQL_TIME
-   * Types.SQL_TIMESTAMP
-   * Types.PRIMITIVE_ARRAY
-   * Types.OBJECT_ARRAY
-   * Types.MAP
-   * Types.MULTISET
-   * null
-   */
-
-  override protected def validateWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties)
-    : Unit = {
-
-    val typeKey = s"$keyPrefix${LiteralValueValidator.TYPE}"
-
-    properties.validateType(typeKey, true, false)
-
-    // explicit type
-    if (properties.containsKey(typeKey)) {
-      val valueKey = s"$keyPrefix${LiteralValueValidator.VALUE}"
-      val typeInfo = properties.getType(typeKey)
-      typeInfo match {
-        case Types.BIG_DEC => properties.validateBigDecimal(valueKey, false)
-        case Types.BOOLEAN => properties.validateBoolean(valueKey, false)
-        case Types.BYTE => properties.validateByte(valueKey, false)
-        case Types.DOUBLE => properties.validateDouble(valueKey, false)
-        case Types.FLOAT => properties.validateFloat(valueKey, false)
-        case Types.INT => properties.validateInt(valueKey, false)
-        case Types.LONG => properties.validateLong(valueKey, false)
-        case Types.SHORT => properties.validateShort(valueKey, false)
-        case Types.STRING => properties.validateString(valueKey, false)
-        case _ => throw new TableException(s"Unsupported type '$typeInfo'.")
-      }
-    }
-    // implicit type
-    else {
-      // do not allow values in top-level
-      if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
-        throw new ValidationException(
-          "Literal values with implicit type must not exist in the top level 
of a hierarchy.")
-      }
-      properties.validateString(keyPrefix.substring(0, keyPrefix.length - 1), 
false)
-    }
-  }
-}
-
-object LiteralValueValidator {
-  val TYPE = "type"
-  val VALUE = "value"
-
-  private val LITERAL_FALSE = "false"
-  private val LITERAL_TRUE = "true"
-
-  /**
-    * Gets the value according to the type and value strings.
-    *
-    * @param keyPrefix the prefix of the literal type key
-    * @param properties the descriptor properties
-    * @return the derived value
-    */
-  def getValue(keyPrefix: String, properties: DescriptorProperties): Any = {
-    val typeKey = s"$keyPrefix$TYPE"
-    // explicit type
-    if (properties.containsKey(typeKey)) {
-      val valueKey = s"$keyPrefix$VALUE"
-      val typeInfo = properties.getType(typeKey)
-      typeInfo match {
-        case Types.BIG_DEC => properties.getBigDecimal(valueKey)
-        case Types.BOOLEAN => properties.getBoolean(valueKey)
-        case Types.BYTE => properties.getByte(valueKey)
-        case Types.DOUBLE => properties.getDouble(valueKey)
-        case Types.FLOAT => properties.getFloat(valueKey)
-        case Types.INT => properties.getInt(valueKey)
-        case Types.LONG => properties.getLong(valueKey)
-        case Types.SHORT => properties.getShort(valueKey)
-        case Types.STRING => properties.getString(valueKey)
-        case _ => throw new TableException(s"Unsupported type 
'${typeInfo.getTypeClass}'.")
-      }
-    }
-    // implicit type
-    else {
-      deriveTypeStringFromValueString(
-        properties.getString(keyPrefix.substring(0, keyPrefix.length - 1)))
-    }
-  }
-
-  /**
-    * Tries to derive a literal value from the given string value.
-    * The derivation priority for the types are BOOLEAN, INT, DOUBLE, and 
VARCHAR.
-    *
-    * @param valueString the string formatted value
-    * @return parsed value
-    */
-  def deriveTypeStringFromValueString(valueString: String): AnyRef = {
-    if (valueString.equals(LITERAL_TRUE) || valueString.equals(LITERAL_FALSE)) 
{
-      JBoolean.valueOf(valueString)
-    } else {
-      try {
-        JInt.valueOf(valueString)
-      } catch {
-        case _: NumberFormatException =>
-          try {
-            JDouble.valueOf(valueString)
-          } catch {
-            case _: NumberFormatException =>
-              valueString
-          }
-      }
-    }
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
index 8b75e15..fdb0ccf 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
@@ -33,25 +33,25 @@ class ClassInstanceTest extends DescriptorTestBase {
   }
 
   override def descriptors(): JList[Descriptor] = {
-    val desc1 = ClassInstance()
+    val desc1 = new ClassInstance()
       .of("class1")
       .parameter(Types.LONG, "1")
       .parameter(
-        ClassInstance()
+        new ClassInstance()
           .of("class2")
           .parameter(
-            ClassInstance()
+            new ClassInstance()
               .of("class3")
               .parameterString("StarryNight")
               .parameter(
-                ClassInstance()
+                new ClassInstance()
                     .of("class4"))))
       .parameter(2L)
 
-    val desc2 = ClassInstance()
+    val desc2 = new ClassInstance()
       .of("class2")
 
-    val desc3 = ClassInstance()
+    val desc3 = new ClassInstance()
       .of("org.example.Function")
       .parameter(42)
       .parameter(2.asInstanceOf[Byte])
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
index 1cd763f..a541850 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
@@ -28,13 +28,13 @@ import scala.collection.JavaConverters._
 class FunctionDescriptorTest extends DescriptorTestBase {
 
   override def descriptors(): JList[Descriptor] = {
-    val desc1 = FunctionDescriptor()
+    val desc1 = new FunctionDescriptor()
       .fromClass(
-        ClassInstance()
+        new ClassInstance()
           .of("my.class")
           .parameter("INT", "1")
           .parameter(
-            ClassInstance()
+            new ClassInstance()
               .of("my.class2")
               .parameterString("true")))
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
index 271d563..8f633b1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
@@ -43,15 +43,15 @@ class LiteralValueTest extends DescriptorTestBase {
   }
 
   override def descriptors(): JList[Descriptor] = {
-    val bigDecimalDesc = LiteralValue().of(Types.DECIMAL).value(new 
JBigDecimal(1))
-    val booleanDesc = LiteralValue().of(Types.BOOLEAN).value(false)
-    val byteDesc = LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
-    val doubleDesc = LiteralValue().of(Types.DOUBLE).value(7.0)
-    val floatDesc = LiteralValue().of(Types.FLOAT).value(8.0f)
-    val intDesc = LiteralValue().of(Types.INT).value(9)
-    val longDesc = LiteralValue().of(Types.LONG).value(10L)
-    val shortDesc = 
LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
-    val stringDesc = LiteralValue().of(Types.STRING).value("12")
+    val bigDecimalDesc = new LiteralValue().of(Types.DECIMAL).value(new 
JBigDecimal(1))
+    val booleanDesc = new LiteralValue().of(Types.BOOLEAN).value(false)
+    val byteDesc = new 
LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
+    val doubleDesc = new LiteralValue().of(Types.DOUBLE).value(7.0)
+    val floatDesc = new LiteralValue().of(Types.FLOAT).value(8.0f)
+    val intDesc = new LiteralValue().of(Types.INT).value(9)
+    val longDesc = new LiteralValue().of(Types.LONG).value(10L)
+    val shortDesc = new 
LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
+    val stringDesc = new LiteralValue().of(Types.STRING).value("12")
 
     // for tests with implicit type see ClassInstanceTest because literal 
value are not
     // supported in the top level of a hierarchy
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
index bcef012..2ffc423 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
@@ -31,8 +31,8 @@ class FunctionServiceTest {
 
   @Test(expected = classOf[ValidationException])
   def testWrongArgsFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance()
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance()
         .of(classOf[NoArgClass].getName)
         .parameterString("12"))
 
@@ -41,24 +41,24 @@ class FunctionServiceTest {
 
   @Test(expected = classOf[ValidationException])
   def testPrivateFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance().of(classOf[PrivateClass].getName))
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance().of(classOf[PrivateClass].getName))
 
     FunctionService.createFunction(descriptor)
   }
 
   @Test(expected = classOf[ValidationException])
   def testInvalidClassFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance().of("this.class.does.not.exist"))
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance().of("this.class.does.not.exist"))
 
     FunctionService.createFunction(descriptor)
   }
 
   @Test(expected = classOf[ValidationException])
   def testNotFunctionClassFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance()
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance()
         .of(classOf[java.lang.String].getName)
         .parameterString("hello"))
 
@@ -67,17 +67,17 @@ class FunctionServiceTest {
 
   @Test
   def testNoArgFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance().of(classOf[NoArgClass].getName))
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance().of(classOf[NoArgClass].getName))
 
     assertEquals(classOf[NoArgClass], 
FunctionService.createFunction(descriptor).getClass)
   }
 
   @Test
   def testOneArgFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
+    val descriptor = new FunctionDescriptor()
       .fromClass(
-        ClassInstance()
+        new ClassInstance()
           .of(classOf[OneArgClass].getName)
           .parameterString("false"))
 
@@ -89,12 +89,12 @@ class FunctionServiceTest {
 
   @Test
   def testMultiArgFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
+    val descriptor = new FunctionDescriptor()
       .fromClass(
-        ClassInstance()
+        new ClassInstance()
           .of(classOf[MultiArgClass].getName)
           .parameter(new java.math.BigDecimal("12.0003"))
-          .parameter(ClassInstance()
+          .parameter(new ClassInstance()
             .of(classOf[java.math.BigInteger].getName)
             .parameter("111111111111111111111111111111111")))
 

Reply via email to