This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d875e  [FLINK-10166] [table] Reduce dependencies by removing 
org.apache.commons
08d875e is described below

commit 08d875eadec779f68baa1cc7003dab7bdd51640a
Author: Timo Walther <[email protected]>
AuthorDate: Mon Oct 29 13:25:54 2018 +0100

    [FLINK-10166] [table] Reduce dependencies by removing org.apache.commons
    
    This commit removes all dependencies to org.apache.commons libraries in 
flink-table. In
    the past we only used a couple of methods that were partially pulled in 
from Hadoop
    causing the issues mentioned in the JIRA ticket.
    
    This closes #6966.
---
 .../table/descriptors/DescriptorProperties.java    |   2 +-
 .../apache/flink/table/utils/EncodingUtils.java    | 151 +++++++++++++++++++--
 .../flink/table/utils/EncodingUtilsTest.java       |  93 +++++++++++++
 flink-libraries/flink-table/pom.xml                |  21 ---
 .../table/codegen/AggregationCodeGenerator.scala   |  18 +--
 .../apache/flink/table/codegen/CodeGenerator.scala |  12 +-
 .../flink/table/codegen/MatchCodeGenerator.scala   |  10 +-
 .../table/codegen/calls/HashCalcCallGen.scala      |  10 +-
 .../codegen/calls/ScalarFunctionCallGen.scala      |   3 +-
 .../table/codegen/calls/TableFunctionCallGen.scala |   3 +-
 .../apache/flink/table/expressions/literals.scala  |   7 +-
 .../table/functions/UserDefinedFunction.scala      |   6 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |  26 +---
 .../org/apache/flink/table/plan/TreeNode.scala     |  12 +-
 .../table/runtime/functions/ScalarFunctions.scala  |  23 ++--
 .../flink/table/typeutils/TypeCheckUtils.scala     |  91 +++++++++++++
 .../table/runtime/harness/HarnessTestBase.scala    |  25 ++--
 .../flink/table/typeutils/TypeCheckUtilsTest.scala |  21 +++
 18 files changed, 416 insertions(+), 118 deletions(-)

diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 0d2f835..c476981 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -1326,7 +1326,7 @@ public class DescriptorProperties {
        }
 
        public static String toString(String str) {
-               return EncodingUtils.escapeJava(str).replace("\\/", "/"); // 
'/' must not be escaped
+               return EncodingUtils.escapeJava(str);
        }
 
        public static String toString(String key, String value) {
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 41fa58e..47aac25 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -26,9 +27,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * General utilities for string-encoding. This class is used to avoid 
additional dependencies
  * to other projects.
@@ -40,6 +44,8 @@ public abstract class EncodingUtils {
 
        private static final Base64.Decoder BASE64_DECODER = 
java.util.Base64.getUrlDecoder();
 
+       private static final char[] HEX_CHARS = 
"0123456789abcdef".toCharArray();
+
        private EncodingUtils() {
                // do not instantiate
        }
@@ -47,7 +53,7 @@ public abstract class EncodingUtils {
        public static String encodeObjectToString(Serializable obj) {
                try {
                        final byte[] bytes = 
InstantiationUtil.serializeObject(obj);
-                       return new String(BASE64_ENCODER.encode(bytes), 
StandardCharsets.UTF_8);
+                       return new String(BASE64_ENCODER.encode(bytes), UTF_8);
                } catch (Exception e) {
                        throw new ValidationException(
                                "Unable to serialize object '" + obj.toString() 
+ "' of class '" + obj.getClass().getName() + "'.");
@@ -60,7 +66,7 @@ public abstract class EncodingUtils {
 
        public static <T extends Serializable> T decodeStringToObject(String 
base64String, Class<T> baseClass, ClassLoader classLoader) {
                try {
-                       final byte[] bytes = 
BASE64_DECODER.decode(base64String.getBytes(StandardCharsets.UTF_8));
+                       final byte[] bytes = 
BASE64_DECODER.decode(base64String.getBytes(UTF_8));
                        final T instance = 
InstantiationUtil.deserializeObject(bytes, classLoader);
                        if (instance != null && 
!baseClass.isAssignableFrom(instance.getClass())) {
                                throw new ValidationException(
@@ -87,10 +93,138 @@ public abstract class EncodingUtils {
                return loadClass(qualifiedName, 
Thread.currentThread().getContextClassLoader());
        }
 
+       public static String encodeStringToBase64(String string) {
+               return new 
String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8);
+       }
+
+       public static String decodeBase64ToString(String base64) {
+               return new 
String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+       }
+
+       public static byte[] md5(String string) {
+               try {
+                       return 
MessageDigest.getInstance("MD5").digest(string.getBytes(UTF_8));
+               } catch (NoSuchAlgorithmException e) {
+                       throw new TableException("Unsupported MD5 algorithm.", 
e);
+               }
+       }
+
+       public static String hex(String string) {
+               return hex(string.getBytes(UTF_8));
+       }
+
+       public static String hex(byte[] bytes) {
+               // adopted from https://stackoverflow.com/a/9855338
+               final char[] hexChars = new char[bytes.length * 2];
+               for (int j = 0; j < bytes.length; j++) {
+                       final int v = bytes[j] & 0xFF;
+                       hexChars[j * 2] = HEX_CHARS[v >>> 4];
+                       hexChars[j * 2 + 1] = HEX_CHARS[v & 0x0F];
+               }
+               return new String(hexChars);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Java String Repetition
+       //
+       // copied from o.a.commons.lang3.StringUtils (commons-lang3:3.3.2)
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final String EMPTY = "";
+
+       /**
+        * The maximum size to which the padding constant(s) can expand.
+        */
+       private static final int PAD_LIMIT = 8192;
+
+       /**
+        * Repeat a String {@code repeat} times to form a new String.
+        *
+        * <pre>
+        * StringUtils.repeat(null, 2) = null
+        * StringUtils.repeat("", 0)   = ""
+        * StringUtils.repeat("", 2)   = ""
+        * StringUtils.repeat("a", 3)  = "aaa"
+        * StringUtils.repeat("ab", 2) = "abab"
+        * StringUtils.repeat("a", -2) = ""
+        * </pre>
+        *
+        * @param str    the String to repeat, may be null
+        * @param repeat number of times to repeat str, negative treated as zero
+        * @return a new String consisting of the original String repeated, 
{@code null} if null String input
+        */
+       public static String repeat(final String str, final int repeat) {
+               // Performance tuned for 2.0 (JDK1.4)
+
+               if (str == null) {
+                       return null;
+               }
+               if (repeat <= 0) {
+                       return EMPTY;
+               }
+               final int inputLength = str.length();
+               if (repeat == 1 || inputLength == 0) {
+                       return str;
+               }
+               if (inputLength == 1 && repeat <= PAD_LIMIT) {
+                       return repeat(str.charAt(0), repeat);
+               }
+
+               final int outputLength = inputLength * repeat;
+               switch (inputLength) {
+                       case 1:
+                               return repeat(str.charAt(0), repeat);
+                       case 2:
+                               final char ch0 = str.charAt(0);
+                               final char ch1 = str.charAt(1);
+                               final char[] output2 = new char[outputLength];
+                               for (int i = repeat * 2 - 2; i >= 0; i--, i--) {
+                                       output2[i] = ch0;
+                                       output2[i + 1] = ch1;
+                               }
+                               return new String(output2);
+                       default:
+                               final StringBuilder buf = new 
StringBuilder(outputLength);
+                               for (int i = 0; i < repeat; i++) {
+                                       buf.append(str);
+                               }
+                               return buf.toString();
+               }
+       }
+
+       /**
+        * Returns padding using the specified delimiter repeated to a given 
length.
+        *
+        * <pre>
+        * StringUtils.repeat('e', 0)  = ""
+        * StringUtils.repeat('e', 3)  = "eee"
+        * StringUtils.repeat('e', -2) = ""
+        * </pre>
+        *
+        * <p>Note: this method doesn't not support padding with
+        * <a 
href="http://www.unicode.org/glossary/#supplementary_character";>Unicode 
Supplementary Characters</a>
+        * as they require a pair of {@code char}s to be represented.
+        * If you are needing to support full I18N of your applications
+        * consider using {@link #repeat(String, int)} instead.
+        *
+        * @param ch     character to repeat
+        * @param repeat number of times to repeat char, negative treated as 
zero
+        * @return String with repeated character
+        * @see #repeat(String, int)
+        */
+       public static String repeat(final char ch, final int repeat) {
+               final char[] buf = new char[repeat];
+               for (int i = repeat - 1; i >= 0; i--) {
+                       buf[i] = ch;
+               }
+               return new String(buf);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        // Java String Escaping
        //
-       // copied from o.a.commons.lang.StringEscapeUtils 
(commons-lang:commons-lang:2.4)
+       // copied from o.a.commons.lang.StringEscapeUtils (commons-lang:2.4)
+       // but without escaping forward slashes.
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -197,10 +331,11 @@ public abstract class EncodingUtils {
                                                out.write('\\');
                                                out.write('\\');
                                                break;
-                                       case '/':
-                                               out.write('\\');
-                                               out.write('/');
-                                               break;
+                                       // MODIFICATION: Flink removes invalid 
escaping of forward slashes!
+                                       // case '/':
+                                       //      out.write('\\');
+                                       //      out.write('/');
+                                       //      break;
                                        default:
                                                out.write(ch);
                                                break;
diff --git 
a/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
 
b/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
new file mode 100644
index 0000000..3bc53d8
--- /dev/null
+++ 
b/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link org.apache.flink.table.utils.EncodingUtils}.
+ */
+public class EncodingUtilsTest {
+
+       @Test
+       public void testObjectStringEncoding() {
+               final MyPojo pojo = new MyPojo(33, "Hello");
+               final String base64 = EncodingUtils.encodeObjectToString(pojo);
+               assertEquals(pojo, EncodingUtils.decodeStringToObject(base64, 
Serializable.class));
+       }
+
+       @Test
+       public void testStringBase64Encoding() {
+               final String string = "Hello, this is apache flink.";
+               final String base64 = 
EncodingUtils.encodeStringToBase64(string);
+               assertEquals("SGVsbG8sIHRoaXMgaXMgYXBhY2hlIGZsaW5rLg==", 
base64);
+               assertEquals(string, 
EncodingUtils.decodeBase64ToString(base64));
+       }
+
+       @Test
+       public void testMd5Hex() {
+               final String string = "Hello, world! How are you? 高精确";
+               assertEquals("983abac84e994b4ba73be177e5cc298b", 
EncodingUtils.hex(EncodingUtils.md5(string)));
+       }
+
+       @Test
+       public void testJavaEscaping() {
+               assertEquals("\\\\hello\\\"world'space/", 
EncodingUtils.escapeJava("\\hello\"world'space/"));
+       }
+
+       @Test
+       public void testRepetition() {
+               assertEquals("wewewe", EncodingUtils.repeat("we", 3));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class MyPojo implements Serializable {
+
+               private int number;
+               private String string;
+
+               public MyPojo(int number, String string) {
+                       this.number = number;
+                       this.string = string;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       MyPojo myPojo = (MyPojo) o;
+                       return number == myPojo.number && 
Objects.equals(string, myPojo.string);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(number, string);
+               }
+       }
+}
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 1465371..6b2fe8c 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -78,18 +78,6 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
-               <!-- Used for base64 encoding of UDFs -->
-               <dependency>
-                       <groupId>commons-codec</groupId>
-                       <artifactId>commons-codec</artifactId>
-               </dependency>
-
-               <!-- Used for code generation -->
-               <dependency>
-                       <groupId>org.apache.commons</groupId>
-                       <artifactId>commons-lang3</artifactId>
-               </dependency>
-
                <!-- Used for code generation -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -279,12 +267,9 @@ under the License.
                                                                        
<include>com.google.guava:guava</include>
                                                                        
<include>net.hydromatic:*</include>
                                                                        
<include>com.esri.geometry:*</include>
-                                                                       
<include>commons-lang:*</include>
 
                                                                        <!-- 
flink-table dependencies -->
                                                                        
<include>org.apache.flink:flink-table-common</include>
-                                                                       
<include>commons-codec:*</include>
-                                                                       
<include>org.apache.commons:commons-lang3</include>
                                                                        
<include>org.codehaus.janino:*</include>
                                                                        
<include>joda-time:*</include>
                                                                </includes>
@@ -320,12 +305,6 @@ under the License.
                                                                        
<pattern>org.codehaus</pattern>
                                                                        
<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
                                                                </relocation>-->
-
-                                                               <!-- 
commons-codec, commons-lang3, and commons-lang (from Calcite) -->
-                                                               <relocation>
-                                                                       
<pattern>org.apache.commons</pattern>
-                                                                       
<shadedPattern>org.apache.flink.table.shaded.org.apache.commons</shadedPattern>
-                                                               </relocation>
                                                        </relocations>
                                                </configuration>
                                        </execution>
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 11c0008..566e3d7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -21,7 +21,6 @@ import java.lang.reflect.Modifier
 import java.lang.{Iterable => JIterable}
 
 import org.apache.calcite.rex.RexLiteral
-import org.apache.commons.codec.binary.Base64
 import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, State, StateDescriptor}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -36,8 +35,8 @@ import 
org.apache.flink.table.functions.aggfunctions.DistinctAccumulator
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod,
 signatureToString}
 import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
SingleElementIterable}
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.types.Row
-import org.apache.flink.util.InstantiationUtil
 
 import scala.collection.mutable
 
@@ -315,7 +314,7 @@ class AggregationCodeGenerator(
       val dataViewTypeTerm = dataViewField.getType.getCanonicalName
 
       // define the DataView variables
-      val serializedData = serializeStateDescriptor(desc)
+      val serializedData = EncodingUtils.encodeObjectToString(desc)
       val dataViewFieldTerm = createDataViewTerm(aggIndex, 
dataViewField.getName)
       val field =
         s"""
@@ -329,9 +328,10 @@ class AggregationCodeGenerator(
       val descDeserializeCode =
         s"""
            |    $descClassQualifier $descFieldTerm = ($descClassQualifier)
-           |      org.apache.flink.util.InstantiationUtil.deserializeObject(
-           |      
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
-           |      $contextTerm.getUserCodeClassLoader());
+           |      
${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+           |        "$serializedData",
+           |        $descClassQualifier.class,
+           |        $contextTerm.getUserCodeClassLoader());
            |""".stripMargin
       val createDataView = if (dataViewField.getType == classOf[MapView[_, 
_]]) {
         s"""
@@ -770,10 +770,4 @@ class AggregationCodeGenerator(
 
     GeneratedAggregationsFunction(funcName, funcCode)
   }
-
-  @throws[Exception]
-  def serializeStateDescriptor(stateDescriptor: StateDescriptor[_, _]): String 
= {
-    val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
-    Base64.encodeBase64URLSafeString(byteArray)
-  }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 5d047ad..13bf50a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -26,7 +26,6 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable.{ROW, _}
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -40,10 +39,10 @@ import 
org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NU
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, 
FunctionGenerator}
 import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, 
ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.utils.EncodingUtils
 import org.joda.time.format.DateTimeFormatter
 
 import scala.collection.JavaConversions._
@@ -681,7 +680,7 @@ abstract class CodeGenerator(
         generateNonNullLiteral(resultType, decimalField)
 
       case VARCHAR | CHAR =>
-        val escapedValue = 
StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
+        val escapedValue = EncodingUtils.escapeJava(value.toString)
         generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
 
       case SYMBOL =>
@@ -1610,7 +1609,7 @@ abstract class CodeGenerator(
     */
   def addReusableFunction(function: UserDefinedFunction, contextTerm: String = 
null): String = {
     val classQualifier = function.getClass.getCanonicalName
-    val functionSerializedData = UserDefinedFunctionUtils.serialize(function)
+    val functionSerializedData = EncodingUtils.encodeObjectToString(function)
     val fieldTerm = s"function_${function.functionIdentifier}"
 
     val fieldFunction =
@@ -1622,8 +1621,9 @@ abstract class CodeGenerator(
     val functionDeserialization =
       s"""
          |$fieldTerm = ($classQualifier)
-         |${UserDefinedFunctionUtils.getClass.getName.stripSuffix("$")}
-         |.deserialize("$functionSerializedData");
+         |${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+         |  "$functionSerializedData",
+         |  ${classOf[UserDefinedFunction].getCanonicalName}.class);
        """.stripMargin
 
     reusableInitStatements.add(functionDeserialization)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
index a68bf8e..791d388 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
@@ -23,7 +23,6 @@ import java.util
 
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.commons.lang3.StringEscapeUtils.escapeJava
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.cep.pattern.conditions.IterativeCondition
@@ -33,6 +32,7 @@ import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, ne
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.util.Collector
 import org.apache.flink.util.MathUtils.checkedDownCast
 
@@ -102,7 +102,7 @@ class MatchCodeGenerator(
   private def addReusablePatternNames() : Unit = {
     reusableMemberStatements
       .add(s"private String[] $patternNamesTerm = new String[] { ${
-        patternNames.map(p => s""""${escapeJava(p)}"""").mkString(", ")
+        patternNames.map(p => 
s""""${EncodingUtils.escapeJava(p)}"""").mkString(", ")
       } };")
   }
 
@@ -336,7 +336,7 @@ class MatchCodeGenerator(
          |}
          """.stripMargin
     } else {
-      val escapedPatternName = escapeJava(patternName)
+      val escapedPatternName = EncodingUtils.escapeJava(patternName)
       j"""
          |java.util.List $listName = new java.util.ArrayList();
          |for ($eventTypeTerm $eventNameTerm :
@@ -373,7 +373,7 @@ class MatchCodeGenerator(
          |}
          """.stripMargin
     } else {
-      val escapedPatternName = escapeJava(patternName)
+      val escapedPatternName = EncodingUtils.escapeJava(patternName)
       j"""
          |java.util.List $listName = (java.util.List) 
$input1Term.get("$escapedPatternName");
          |if ($listName == null) {
@@ -427,7 +427,7 @@ class MatchCodeGenerator(
   }
 
   private def generatePatternFieldRef(fieldRef: RexPatternFieldRef): 
GeneratedExpression = {
-    val escapedAlpha = escapeJava(fieldRef.getAlpha)
+    val escapedAlpha = EncodingUtils.escapeJava(fieldRef.getAlpha)
     val patternVariableRef = reusableInputUnboxingExprs
       .get((s"$escapedAlpha#$first", offset)) match {
       // input access and unboxing has already been generated
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
index d3be6e1..1ce8af8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.commons.codec.Charsets
-import org.apache.commons.codec.binary.Hex
+import java.nio.charset.StandardCharsets
+
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.table.codegen.CodeGenUtils.newName
 import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallWithStmtIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.utils.EncodingUtils
 
 class HashCalcCallGen(algName: String) extends CallGenerator {
 
@@ -68,9 +69,10 @@ class HashCalcCallGen(algName: String) extends CallGenerator 
{
         val auxiliaryStmt =
           s"""
             |${initStmt.getOrElse("")}
-            
|$md.update(${terms.head}.getBytes(${classOf[Charsets].getCanonicalName}.UTF_8));
+            |$md.update(${terms.head}
+            |  .getBytes(${classOf[StandardCharsets].getCanonicalName}.UTF_8));
             |""".stripMargin
-        val result = 
s"${classOf[Hex].getCanonicalName}.encodeHexString($md.digest())"
+        val result = 
s"${classOf[EncodingUtils].getCanonicalName}.hex($md.digest())"
         (Some(auxiliaryStmt), result)
     }
   }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index df206de..b91ab5a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
@@ -68,7 +67,7 @@ class ScalarFunctionCallGen(
     val parameters = paramClasses.zip(operands).map { case (paramClass, 
operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
-          } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+          } else if (TypeCheckUtils.isPrimitiveWrapper(paramClass)
               && TypeCheckUtils.isTemporal(operandExpr.resultType)) {
             // we use primitives to represent temporal types internally, so no 
casting needed here
             val exprOrNull: String = if (codeGenerator.nullCheck) {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index e1ad18f..6bcfc6e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
@@ -68,7 +67,7 @@ class TableFunctionCallGen(
     val parameters = paramClasses.zip(operands).map { case (paramClass, 
operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
-          } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+          } else if (TypeCheckUtils.isPrimitiveWrapper(paramClass)
               && TypeCheckUtils.isTemporal(operandExpr.resultType)) {
             // we use primitives to represent temporal types internally, so no 
casting needed here
             val exprOrNull: String = if (codeGenerator.nullCheck) {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 419c125..24bae90 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -17,6 +17,9 @@
  */
 package org.apache.flink.table.expressions
 
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.SqlIntervalQualifier
@@ -27,10 +30,6 @@ import org.apache.calcite.util.{DateString, TimeString, 
TimestampString}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
-import java.sql.{Date, Time, Timestamp}
-import java.util.{Calendar, TimeZone}
-
-import org.apache.commons.lang3.StringEscapeUtils
 
 object Literal {
   private[flink] val UTC = TimeZone.getTimeZone("UTC")
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index 15bcb17..89ba0d4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.table.functions
 
-import org.apache.commons.codec.digest.DigestUtils
-import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.serialize
+import org.apache.flink.table.utils.EncodingUtils
+
 /**
   * Base class for all user-defined functions such as scalar functions, table 
functions,
   * or aggregation functions.
@@ -49,7 +49,7 @@ abstract class UserDefinedFunction extends Serializable {
   def isDeterministic: Boolean = true
 
   final def functionIdentifier: String = {
-    val md5 = DigestUtils.md5Hex(serialize(this))
+    val md5 = 
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
   }
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 9799e4d..c9a2703 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,28 +19,27 @@
 
 package org.apache.flink.table.functions.utils
 
-import java.util
-import java.lang.{Integer => JInt, Long => JLong}
 import java.lang.reflect.{Method, Modifier}
+import java.lang.{Integer => JInt, Long => JLong}
 import java.sql.{Date, Time, Timestamp}
+import java.util
 
-import org.apache.commons.codec.binary.Base64
 import com.google.common.primitives.Primitives
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
 import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, 
SqlOperandCountRange, SqlOperator, SqlOperatorBinding}
+import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, 
SqlOperandCountRange, SqlOperator}
 import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, 
TypeExtractor}
 import org.apache.flink.table.api.dataview._
-import org.apache.flink.table.dataview._
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataview._
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction, UserDefinedFunction}
+import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
 
@@ -734,19 +733,6 @@ object UserDefinedFunctionUtils {
       (candidate.getComponentType == expected.getComponentType ||
         expected.getComponentType == classOf[Object]))
 
-  @throws[Exception]
-  def serialize(function: UserDefinedFunction): String = {
-    val byteArray = InstantiationUtil.serializeObject(function)
-    Base64.encodeBase64URLSafeString(byteArray)
-  }
-
-  @throws[Exception]
-  def deserialize(data: String): UserDefinedFunction = {
-    val byteData = Base64.decodeBase64(data)
-    InstantiationUtil
-      .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
-  }
-
   /**
     * Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
     *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
index fdf45e7..097346a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.plan
 
-import org.apache.commons.lang.ClassUtils
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 /**
  * Generic base class for trees that can be transformed and traversed.
@@ -88,28 +88,28 @@ abstract class TreeNode[A <: TreeNode[A]] extends Product { 
self: A =>
    * if children change.
    */
   private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
-    val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0)
+    val ctors = getClass.getConstructors.filter(_.getParameterTypes.length > 0)
     if (ctors.isEmpty) {
       throw new RuntimeException(s"No valid constructor for 
${getClass.getSimpleName}")
     }
 
     val defaultCtor = ctors.find { ctor =>
-      if (ctor.getParameterTypes.size != newArgs.length) {
+      if (ctor.getParameterTypes.length != newArgs.length) {
         false
       } else if (newArgs.contains(null)) {
         false
       } else {
         val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
-        ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes)
+        TypeCheckUtils.isAssignable(argsClasses, ctor.getParameterTypes)
       }
-    }.getOrElse(ctors.maxBy(_.getParameterTypes.size))
+    }.getOrElse(ctors.maxBy(_.getParameterTypes.length))
 
     try {
       defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
     } catch {
       case e: Throwable =>
         throw new RuntimeException(
-          s"Fail to copy treeNode ${getClass.getName}: 
${e.getStackTraceString}")
+          s"Fail to copy tree node ${getClass.getName}.", e)
     }
   }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index da7cf64..a2acb0c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -19,12 +19,9 @@ package org.apache.flink.table.runtime.functions
 
 import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
-import java.nio.charset.StandardCharsets
-import java.util.regex.Matcher
-import java.util.regex.Pattern
+import java.util.regex.{Matcher, Pattern}
 
-import org.apache.commons.codec.binary.{Base64, Hex}
-import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.utils.EncodingUtils
 
 import scala.annotation.varargs
 
@@ -269,25 +266,25 @@ object ScalarFunctions {
   /**
     * Returns the base string decoded with base64.
     */
-  def fromBase64(str: String): String =
-    new String(Base64.decodeBase64(str), StandardCharsets.UTF_8)
+  def fromBase64(base64: String): String =
+    EncodingUtils.decodeBase64ToString(base64)
 
   /**
     * Returns the base64-encoded result of the input string.
     */
-  def toBase64(base: String): String =
-    Base64.encodeBase64String(base.getBytes(StandardCharsets.UTF_8))
+  def toBase64(string: String): String =
+    EncodingUtils.encodeStringToBase64(string)
 
   /**
     * Returns the hex string of a long argument.
     */
-  def hex(x: Long): String = JLong.toHexString(x).toUpperCase()
+  def hex(string: Long): String = JLong.toHexString(string).toUpperCase()
 
   /**
     * Returns the hex string of a string argument.
     */
-  def hex(x: String): String =
-    Hex.encodeHexString(x.getBytes(StandardCharsets.UTF_8)).toUpperCase()
+  def hex(string: String): String =
+    EncodingUtils.hex(string).toUpperCase()
 
   /**
     * Returns an UUID string using Java utilities.
@@ -297,6 +294,6 @@ object ScalarFunctions {
   /**
     * Returns a string that repeats the base string n times.
     */
-  def repeat(base: String, n: Int): String = StringUtils.repeat(base, n)
+  def repeat(base: String, n: Int): String = EncodingUtils.repeat(base, n)
 
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 02af798..25367ad 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -183,4 +183,95 @@ object TypeCheckUtils {
       }
     }
   }
+
+  /**
+    * Checks if a class is a Java primitive wrapper.
+    */
+  def isPrimitiveWrapper(clazz: Class[_]): Boolean = {
+    clazz == classOf[java.lang.Boolean] ||
+    clazz == classOf[java.lang.Byte] ||
+    clazz == classOf[java.lang.Character] ||
+    clazz == classOf[java.lang.Short] ||
+    clazz == classOf[java.lang.Integer] ||
+    clazz == classOf[java.lang.Long] ||
+    clazz == classOf[java.lang.Double] ||
+    clazz == classOf[java.lang.Float]
+  }
+
+  /**
+    * Checks if one class can be assigned to a variable of another class.
+    *
+    * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class[], 
java.lang.Class[])
+    * but without null checks.
+    */
+  def isAssignable(classArray: Array[Class[_]], toClassArray: 
Array[Class[_]]): Boolean = {
+    if (classArray.length != toClassArray.length) {
+      return false
+    }
+    var i = 0
+    while (i < classArray.length) {
+      if (!isAssignable(classArray(i), toClassArray(i))) {
+        return false
+      }
+      i += 1
+    }
+    true
+  }
+
+  /**
+    * Checks if one class can be assigned to a variable of another class.
+    *
+    * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class, 
java.lang.Class) but
+    * without null checks.
+    */
+  def isAssignable(cls: Class[_], toClass: Class[_]): Boolean = {
+    if (cls.equals(toClass)) {
+      return true
+    }
+    if (cls.isPrimitive) {
+      if (!toClass.isPrimitive) {
+        return false
+      }
+      if (java.lang.Integer.TYPE.equals(cls)) {
+        return java.lang.Long.TYPE.equals(toClass) ||
+          java.lang.Float.TYPE.equals(toClass) ||
+          java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Long.TYPE.equals(cls)) {
+        return java.lang.Float.TYPE.equals(toClass) ||
+          java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Boolean.TYPE.equals(cls)) {
+        return false
+      }
+      if (java.lang.Double.TYPE.equals(cls)) {
+        return false
+      }
+      if (java.lang.Float.TYPE.equals(cls)) {
+          return java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Character.TYPE.equals(cls)) {
+          return java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Short.TYPE.equals(cls)) {
+          return java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Byte.TYPE.equals(cls)) {
+          return java.lang.Short.TYPE.equals(toClass) ||
+            java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      // should never get here
+      return false
+    }
+    toClass.isAssignableFrom(cls)
+  }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 28b7d14..f70d991 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -30,12 +30,12 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
 import 
org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFunction, 
LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
 import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 RowResultSortComparatorWithWatermarks}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.utils.EncodingUtils
 import org.junit.Rule
 import org.junit.rules.ExpectedException
 
@@ -47,13 +47,13 @@ class HarnessTestBase {
   def thrown = expectedException
 
   val longMinWithRetractAggFunction: String =
-    UserDefinedFunctionUtils.serialize(new LongMinWithRetractAggFunction)
+    EncodingUtils.encodeObjectToString(new LongMinWithRetractAggFunction)
 
   val longMaxWithRetractAggFunction: String =
-    UserDefinedFunctionUtils.serialize(new LongMaxWithRetractAggFunction)
+    EncodingUtils.encodeObjectToString(new LongMaxWithRetractAggFunction)
 
   val intSumWithRetractAggFunction: String =
-    UserDefinedFunctionUtils.serialize(new IntSumWithRetractAggFunction)
+    EncodingUtils.encodeObjectToString(new IntSumWithRetractAggFunction)
 
   protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]](
     LONG_TYPE_INFO,
@@ -97,12 +97,14 @@ class HarnessTestBase {
       |  public MinMaxAggregateHelper() throws Exception {
       |
       |    fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
-      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |    .deserialize("$longMinWithRetractAggFunction");
+      |    ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+      |      "$longMinWithRetractAggFunction",
+      |      ${classOf[UserDefinedFunction].getCanonicalName}.class);
       |
       |    fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
-      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |    .deserialize("$longMaxWithRetractAggFunction");
+      |    ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+      |      "$longMaxWithRetractAggFunction",
+      |      ${classOf[UserDefinedFunction].getCanonicalName}.class);
       |  }
       |
       |  public void setAggregationResults(
@@ -220,8 +222,9 @@ class HarnessTestBase {
       |  public SumAggregationHelper() throws Exception {
       |
       |sum = 
(org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction)
-      |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |.deserialize("$intSumWithRetractAggFunction");
+      |${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+      |  "$intSumWithRetractAggFunction",
+      |  ${classOf[UserDefinedFunction].getCanonicalName}.class);
       |}
       |
       |  public final void setAggregationResults(
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
index 645e608..8f5ff54 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.{Types => ScalaTypes}
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.Test
 
 class TypeCheckUtilsTest {
@@ -51,4 +52,24 @@ class TypeCheckUtilsTest {
   def testInvalidType3(): Unit = {
     validateEqualsHashCode("", Types.OBJECT_ARRAY[Nothing](ScalaTypes.NOTHING))
   }
+
+  @Test
+  def testPrimitiveWrapper (): Unit = {
+    assertTrue(TypeCheckUtils.isPrimitiveWrapper(classOf[java.lang.Double]))
+    assertFalse(TypeCheckUtils.isPrimitiveWrapper(classOf[Double]))
+  }
+
+  @Test
+  def testAssignability(): Unit = {
+    assertTrue(TypeCheckUtils.isAssignable(classOf[Double], classOf[Double]))
+    assertFalse(TypeCheckUtils.isAssignable(classOf[Boolean], classOf[Double]))
+    assertTrue(TypeCheckUtils.isAssignable(
+      classOf[java.util.HashMap[_, _]], classOf[java.util.Map[_, _]]))
+    assertFalse(TypeCheckUtils.isAssignable(
+      classOf[java.util.Map[_, _]], classOf[java.util.HashMap[_, _]]))
+
+    assertTrue(TypeCheckUtils.isAssignable(
+      Array[Class[_]](classOf[Double], classOf[Double]),
+      Array[Class[_]](classOf[Double], classOf[Double])))
+  }
 }

Reply via email to