This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 08d875e [FLINK-10166] [table] Reduce dependencies by removing
org.apache.commons
08d875e is described below
commit 08d875eadec779f68baa1cc7003dab7bdd51640a
Author: Timo Walther <[email protected]>
AuthorDate: Mon Oct 29 13:25:54 2018 +0100
[FLINK-10166] [table] Reduce dependencies by removing org.apache.commons
This commit removes all dependencies to org.apache.commons libraries in
flink-table. In
the past we only used a couple of methods that were partially pulled in
from Hadoop
causing the issues mentioned in the JIRA ticket.
This closes #6966.
---
.../table/descriptors/DescriptorProperties.java | 2 +-
.../apache/flink/table/utils/EncodingUtils.java | 151 +++++++++++++++++++--
.../flink/table/utils/EncodingUtilsTest.java | 93 +++++++++++++
flink-libraries/flink-table/pom.xml | 21 ---
.../table/codegen/AggregationCodeGenerator.scala | 18 +--
.../apache/flink/table/codegen/CodeGenerator.scala | 12 +-
.../flink/table/codegen/MatchCodeGenerator.scala | 10 +-
.../table/codegen/calls/HashCalcCallGen.scala | 10 +-
.../codegen/calls/ScalarFunctionCallGen.scala | 3 +-
.../table/codegen/calls/TableFunctionCallGen.scala | 3 +-
.../apache/flink/table/expressions/literals.scala | 7 +-
.../table/functions/UserDefinedFunction.scala | 6 +-
.../functions/utils/UserDefinedFunctionUtils.scala | 26 +---
.../org/apache/flink/table/plan/TreeNode.scala | 12 +-
.../table/runtime/functions/ScalarFunctions.scala | 23 ++--
.../flink/table/typeutils/TypeCheckUtils.scala | 91 +++++++++++++
.../table/runtime/harness/HarnessTestBase.scala | 25 ++--
.../flink/table/typeutils/TypeCheckUtilsTest.scala | 21 +++
18 files changed, 416 insertions(+), 118 deletions(-)
diff --git
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 0d2f835..c476981 100644
---
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -1326,7 +1326,7 @@ public class DescriptorProperties {
}
public static String toString(String str) {
- return EncodingUtils.escapeJava(str).replace("\\/", "/"); //
'/' must not be escaped
+ return EncodingUtils.escapeJava(str);
}
public static String toString(String key, String value) {
diff --git
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 41fa58e..47aac25 100644
---
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.utils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.InstantiationUtil;
@@ -26,9 +27,12 @@ import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Base64;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* General utilities for string-encoding. This class is used to avoid
additional dependencies
* to other projects.
@@ -40,6 +44,8 @@ public abstract class EncodingUtils {
private static final Base64.Decoder BASE64_DECODER =
java.util.Base64.getUrlDecoder();
+ private static final char[] HEX_CHARS =
"0123456789abcdef".toCharArray();
+
private EncodingUtils() {
// do not instantiate
}
@@ -47,7 +53,7 @@ public abstract class EncodingUtils {
public static String encodeObjectToString(Serializable obj) {
try {
final byte[] bytes =
InstantiationUtil.serializeObject(obj);
- return new String(BASE64_ENCODER.encode(bytes),
StandardCharsets.UTF_8);
+ return new String(BASE64_ENCODER.encode(bytes), UTF_8);
} catch (Exception e) {
throw new ValidationException(
"Unable to serialize object '" + obj.toString()
+ "' of class '" + obj.getClass().getName() + "'.");
@@ -60,7 +66,7 @@ public abstract class EncodingUtils {
public static <T extends Serializable> T decodeStringToObject(String
base64String, Class<T> baseClass, ClassLoader classLoader) {
try {
- final byte[] bytes =
BASE64_DECODER.decode(base64String.getBytes(StandardCharsets.UTF_8));
+ final byte[] bytes =
BASE64_DECODER.decode(base64String.getBytes(UTF_8));
final T instance =
InstantiationUtil.deserializeObject(bytes, classLoader);
if (instance != null &&
!baseClass.isAssignableFrom(instance.getClass())) {
throw new ValidationException(
@@ -87,10 +93,138 @@ public abstract class EncodingUtils {
return loadClass(qualifiedName,
Thread.currentThread().getContextClassLoader());
}
+ public static String encodeStringToBase64(String string) {
+ return new
String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8);
+ }
+
+ public static String decodeBase64ToString(String base64) {
+ return new
String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+ }
+
+ public static byte[] md5(String string) {
+ try {
+ return
MessageDigest.getInstance("MD5").digest(string.getBytes(UTF_8));
+ } catch (NoSuchAlgorithmException e) {
+ throw new TableException("Unsupported MD5 algorithm.",
e);
+ }
+ }
+
+ public static String hex(String string) {
+ return hex(string.getBytes(UTF_8));
+ }
+
+ public static String hex(byte[] bytes) {
+ // adopted from https://stackoverflow.com/a/9855338
+ final char[] hexChars = new char[bytes.length * 2];
+ for (int j = 0; j < bytes.length; j++) {
+ final int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = HEX_CHARS[v >>> 4];
+ hexChars[j * 2 + 1] = HEX_CHARS[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Java String Repetition
+ //
+ // copied from o.a.commons.lang3.StringUtils (commons-lang3:3.3.2)
+ //
--------------------------------------------------------------------------------------------
+
+ private static final String EMPTY = "";
+
+ /**
+ * The maximum size to which the padding constant(s) can expand.
+ */
+ private static final int PAD_LIMIT = 8192;
+
+ /**
+ * Repeat a String {@code repeat} times to form a new String.
+ *
+ * <pre>
+ * StringUtils.repeat(null, 2) = null
+ * StringUtils.repeat("", 0) = ""
+ * StringUtils.repeat("", 2) = ""
+ * StringUtils.repeat("a", 3) = "aaa"
+ * StringUtils.repeat("ab", 2) = "abab"
+ * StringUtils.repeat("a", -2) = ""
+ * </pre>
+ *
+ * @param str the String to repeat, may be null
+ * @param repeat number of times to repeat str, negative treated as zero
+ * @return a new String consisting of the original String repeated,
{@code null} if null String input
+ */
+ public static String repeat(final String str, final int repeat) {
+ // Performance tuned for 2.0 (JDK1.4)
+
+ if (str == null) {
+ return null;
+ }
+ if (repeat <= 0) {
+ return EMPTY;
+ }
+ final int inputLength = str.length();
+ if (repeat == 1 || inputLength == 0) {
+ return str;
+ }
+ if (inputLength == 1 && repeat <= PAD_LIMIT) {
+ return repeat(str.charAt(0), repeat);
+ }
+
+ final int outputLength = inputLength * repeat;
+ switch (inputLength) {
+ case 1:
+ return repeat(str.charAt(0), repeat);
+ case 2:
+ final char ch0 = str.charAt(0);
+ final char ch1 = str.charAt(1);
+ final char[] output2 = new char[outputLength];
+ for (int i = repeat * 2 - 2; i >= 0; i--, i--) {
+ output2[i] = ch0;
+ output2[i + 1] = ch1;
+ }
+ return new String(output2);
+ default:
+ final StringBuilder buf = new
StringBuilder(outputLength);
+ for (int i = 0; i < repeat; i++) {
+ buf.append(str);
+ }
+ return buf.toString();
+ }
+ }
+
+ /**
+ * Returns padding using the specified delimiter repeated to a given
length.
+ *
+ * <pre>
+ * StringUtils.repeat('e', 0) = ""
+ * StringUtils.repeat('e', 3) = "eee"
+ * StringUtils.repeat('e', -2) = ""
+ * </pre>
+ *
+ * <p>Note: this method doesn't not support padding with
+ * <a
href="http://www.unicode.org/glossary/#supplementary_character">Unicode
Supplementary Characters</a>
+ * as they require a pair of {@code char}s to be represented.
+ * If you are needing to support full I18N of your applications
+ * consider using {@link #repeat(String, int)} instead.
+ *
+ * @param ch character to repeat
+ * @param repeat number of times to repeat char, negative treated as
zero
+ * @return String with repeated character
+ * @see #repeat(String, int)
+ */
+ public static String repeat(final char ch, final int repeat) {
+ final char[] buf = new char[repeat];
+ for (int i = repeat - 1; i >= 0; i--) {
+ buf[i] = ch;
+ }
+ return new String(buf);
+ }
+
//
--------------------------------------------------------------------------------------------
// Java String Escaping
//
- // copied from o.a.commons.lang.StringEscapeUtils
(commons-lang:commons-lang:2.4)
+ // copied from o.a.commons.lang.StringEscapeUtils (commons-lang:2.4)
+ // but without escaping forward slashes.
//
--------------------------------------------------------------------------------------------
/**
@@ -197,10 +331,11 @@ public abstract class EncodingUtils {
out.write('\\');
out.write('\\');
break;
- case '/':
- out.write('\\');
- out.write('/');
- break;
+ // MODIFICATION: Flink removes invalid
escaping of forward slashes!
+ // case '/':
+ // out.write('\\');
+ // out.write('/');
+ // break;
default:
out.write(ch);
break;
diff --git
a/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
b/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
new file mode 100644
index 0000000..3bc53d8
--- /dev/null
+++
b/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link org.apache.flink.table.utils.EncodingUtils}.
+ */
+public class EncodingUtilsTest {
+
+ @Test
+ public void testObjectStringEncoding() {
+ final MyPojo pojo = new MyPojo(33, "Hello");
+ final String base64 = EncodingUtils.encodeObjectToString(pojo);
+ assertEquals(pojo, EncodingUtils.decodeStringToObject(base64,
Serializable.class));
+ }
+
+ @Test
+ public void testStringBase64Encoding() {
+ final String string = "Hello, this is apache flink.";
+ final String base64 =
EncodingUtils.encodeStringToBase64(string);
+ assertEquals("SGVsbG8sIHRoaXMgaXMgYXBhY2hlIGZsaW5rLg==",
base64);
+ assertEquals(string,
EncodingUtils.decodeBase64ToString(base64));
+ }
+
+ @Test
+ public void testMd5Hex() {
+ final String string = "Hello, world! How are you? 高精确";
+ assertEquals("983abac84e994b4ba73be177e5cc298b",
EncodingUtils.hex(EncodingUtils.md5(string)));
+ }
+
+ @Test
+ public void testJavaEscaping() {
+ assertEquals("\\\\hello\\\"world'space/",
EncodingUtils.escapeJava("\\hello\"world'space/"));
+ }
+
+ @Test
+ public void testRepetition() {
+ assertEquals("wewewe", EncodingUtils.repeat("we", 3));
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static class MyPojo implements Serializable {
+
+ private int number;
+ private String string;
+
+ public MyPojo(int number, String string) {
+ this.number = number;
+ this.string = string;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MyPojo myPojo = (MyPojo) o;
+ return number == myPojo.number &&
Objects.equals(string, myPojo.string);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(number, string);
+ }
+ }
+}
diff --git a/flink-libraries/flink-table/pom.xml
b/flink-libraries/flink-table/pom.xml
index 1465371..6b2fe8c 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -78,18 +78,6 @@ under the License.
<scope>provided</scope>
</dependency>
- <!-- Used for base64 encoding of UDFs -->
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </dependency>
-
- <!-- Used for code generation -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
-
<!-- Used for code generation -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -279,12 +267,9 @@ under the License.
<include>com.google.guava:guava</include>
<include>net.hydromatic:*</include>
<include>com.esri.geometry:*</include>
-
<include>commons-lang:*</include>
<!--
flink-table dependencies -->
<include>org.apache.flink:flink-table-common</include>
-
<include>commons-codec:*</include>
-
<include>org.apache.commons:commons-lang3</include>
<include>org.codehaus.janino:*</include>
<include>joda-time:*</include>
</includes>
@@ -320,12 +305,6 @@ under the License.
<pattern>org.codehaus</pattern>
<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
</relocation>-->
-
- <!--
commons-codec, commons-lang3, and commons-lang (from Calcite) -->
- <relocation>
-
<pattern>org.apache.commons</pattern>
-
<shadedPattern>org.apache.flink.table.shaded.org.apache.commons</shadedPattern>
- </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 11c0008..566e3d7 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -21,7 +21,6 @@ import java.lang.reflect.Modifier
import java.lang.{Iterable => JIterable}
import org.apache.calcite.rex.RexLiteral
-import org.apache.commons.codec.binary.Base64
import org.apache.flink.api.common.state.{ListStateDescriptor,
MapStateDescriptor, State, StateDescriptor}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -36,8 +35,8 @@ import
org.apache.flink.table.functions.aggfunctions.DistinctAccumulator
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod,
signatureToString}
import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations,
SingleElementIterable}
+import org.apache.flink.table.utils.EncodingUtils
import org.apache.flink.types.Row
-import org.apache.flink.util.InstantiationUtil
import scala.collection.mutable
@@ -315,7 +314,7 @@ class AggregationCodeGenerator(
val dataViewTypeTerm = dataViewField.getType.getCanonicalName
// define the DataView variables
- val serializedData = serializeStateDescriptor(desc)
+ val serializedData = EncodingUtils.encodeObjectToString(desc)
val dataViewFieldTerm = createDataViewTerm(aggIndex,
dataViewField.getName)
val field =
s"""
@@ -329,9 +328,10 @@ class AggregationCodeGenerator(
val descDeserializeCode =
s"""
| $descClassQualifier $descFieldTerm = ($descClassQualifier)
- | org.apache.flink.util.InstantiationUtil.deserializeObject(
- |
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
- | $contextTerm.getUserCodeClassLoader());
+ |
${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+ | "$serializedData",
+ | $descClassQualifier.class,
+ | $contextTerm.getUserCodeClassLoader());
|""".stripMargin
val createDataView = if (dataViewField.getType == classOf[MapView[_,
_]]) {
s"""
@@ -770,10 +770,4 @@ class AggregationCodeGenerator(
GeneratedAggregationsFunction(funcName, funcCode)
}
-
- @throws[Exception]
- def serializeStateDescriptor(stateDescriptor: StateDescriptor[_, _]): String
= {
- val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
- Base64.encodeBase64URLSafeString(byteArray)
- }
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 5d047ad..13bf50a 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -26,7 +26,6 @@ import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
import org.apache.calcite.sql.fun.SqlStdOperatorTable.{ROW, _}
-import org.apache.commons.lang3.StringEscapeUtils
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.common.typeutils.CompositeType
@@ -40,10 +39,10 @@ import
org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NU
import org.apache.flink.table.codegen.calls.ScalarOperators._
import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen,
FunctionGenerator}
import org.apache.flink.table.functions.sql.{ProctimeSqlFunction,
ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.utils.EncodingUtils
import org.joda.time.format.DateTimeFormatter
import scala.collection.JavaConversions._
@@ -681,7 +680,7 @@ abstract class CodeGenerator(
generateNonNullLiteral(resultType, decimalField)
case VARCHAR | CHAR =>
- val escapedValue =
StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
+ val escapedValue = EncodingUtils.escapeJava(value.toString)
generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
case SYMBOL =>
@@ -1610,7 +1609,7 @@ abstract class CodeGenerator(
*/
def addReusableFunction(function: UserDefinedFunction, contextTerm: String =
null): String = {
val classQualifier = function.getClass.getCanonicalName
- val functionSerializedData = UserDefinedFunctionUtils.serialize(function)
+ val functionSerializedData = EncodingUtils.encodeObjectToString(function)
val fieldTerm = s"function_${function.functionIdentifier}"
val fieldFunction =
@@ -1622,8 +1621,9 @@ abstract class CodeGenerator(
val functionDeserialization =
s"""
|$fieldTerm = ($classQualifier)
- |${UserDefinedFunctionUtils.getClass.getName.stripSuffix("$")}
- |.deserialize("$functionSerializedData");
+ |${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+ | "$functionSerializedData",
+ | ${classOf[UserDefinedFunction].getCanonicalName}.class);
""".stripMargin
reusableInitStatements.add(functionDeserialization)
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
index a68bf8e..791d388 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
@@ -23,7 +23,6 @@ import java.util
import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.commons.lang3.StringEscapeUtils.escapeJava
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.cep.pattern.conditions.IterativeCondition
@@ -33,6 +32,7 @@ import
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, ne
import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.codegen.Indenter.toISC
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.utils.EncodingUtils
import org.apache.flink.util.Collector
import org.apache.flink.util.MathUtils.checkedDownCast
@@ -102,7 +102,7 @@ class MatchCodeGenerator(
private def addReusablePatternNames() : Unit = {
reusableMemberStatements
.add(s"private String[] $patternNamesTerm = new String[] { ${
- patternNames.map(p => s""""${escapeJava(p)}"""").mkString(", ")
+ patternNames.map(p =>
s""""${EncodingUtils.escapeJava(p)}"""").mkString(", ")
} };")
}
@@ -336,7 +336,7 @@ class MatchCodeGenerator(
|}
""".stripMargin
} else {
- val escapedPatternName = escapeJava(patternName)
+ val escapedPatternName = EncodingUtils.escapeJava(patternName)
j"""
|java.util.List $listName = new java.util.ArrayList();
|for ($eventTypeTerm $eventNameTerm :
@@ -373,7 +373,7 @@ class MatchCodeGenerator(
|}
""".stripMargin
} else {
- val escapedPatternName = escapeJava(patternName)
+ val escapedPatternName = EncodingUtils.escapeJava(patternName)
j"""
|java.util.List $listName = (java.util.List)
$input1Term.get("$escapedPatternName");
|if ($listName == null) {
@@ -427,7 +427,7 @@ class MatchCodeGenerator(
}
private def generatePatternFieldRef(fieldRef: RexPatternFieldRef):
GeneratedExpression = {
- val escapedAlpha = escapeJava(fieldRef.getAlpha)
+ val escapedAlpha = EncodingUtils.escapeJava(fieldRef.getAlpha)
val patternVariableRef = reusableInputUnboxingExprs
.get((s"$escapedAlpha#$first", offset)) match {
// input access and unboxing has already been generated
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
index d3be6e1..1ce8af8 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.codegen.calls
-import org.apache.commons.codec.Charsets
-import org.apache.commons.codec.binary.Hex
+import java.nio.charset.StandardCharsets
+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.table.codegen.CodeGenUtils.newName
import
org.apache.flink.table.codegen.calls.CallGenerator.generateCallWithStmtIfArgsNotNull
import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.utils.EncodingUtils
class HashCalcCallGen(algName: String) extends CallGenerator {
@@ -68,9 +69,10 @@ class HashCalcCallGen(algName: String) extends CallGenerator
{
val auxiliaryStmt =
s"""
|${initStmt.getOrElse("")}
-
|$md.update(${terms.head}.getBytes(${classOf[Charsets].getCanonicalName}.UTF_8));
+ |$md.update(${terms.head}
+ | .getBytes(${classOf[StandardCharsets].getCanonicalName}.UTF_8));
|""".stripMargin
- val result =
s"${classOf[Hex].getCanonicalName}.encodeHexString($md.digest())"
+ val result =
s"${classOf[EncodingUtils].getCanonicalName}.hex($md.digest())"
(Some(auxiliaryStmt), result)
}
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index df206de..b91ab5a 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.codegen.calls
-import org.apache.commons.lang3.ClassUtils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator,
GeneratedExpression}
@@ -68,7 +67,7 @@ class ScalarFunctionCallGen(
val parameters = paramClasses.zip(operands).map { case (paramClass,
operandExpr) =>
if (paramClass.isPrimitive) {
operandExpr
- } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+ } else if (TypeCheckUtils.isPrimitiveWrapper(paramClass)
&& TypeCheckUtils.isTemporal(operandExpr.resultType)) {
// we use primitives to represent temporal types internally, so no
casting needed here
val exprOrNull: String = if (codeGenerator.nullCheck) {
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index e1ad18f..6bcfc6e 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.codegen.calls
-import org.apache.commons.lang3.ClassUtils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
@@ -68,7 +67,7 @@ class TableFunctionCallGen(
val parameters = paramClasses.zip(operands).map { case (paramClass,
operandExpr) =>
if (paramClass.isPrimitive) {
operandExpr
- } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+ } else if (TypeCheckUtils.isPrimitiveWrapper(paramClass)
&& TypeCheckUtils.isTemporal(operandExpr.resultType)) {
// we use primitives to represent temporal types internally, so no
casting needed here
val exprOrNull: String = if (codeGenerator.nullCheck) {
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 419c125..24bae90 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -17,6 +17,9 @@
*/
package org.apache.flink.table.expressions
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.SqlIntervalQualifier
@@ -27,10 +30,6 @@ import org.apache.calcite.util.{DateString, TimeString,
TimestampString}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo,
TypeInformation}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo,
TimeIntervalTypeInfo}
-import java.sql.{Date, Time, Timestamp}
-import java.util.{Calendar, TimeZone}
-
-import org.apache.commons.lang3.StringEscapeUtils
object Literal {
private[flink] val UTC = TimeZone.getTimeZone("UTC")
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index 15bcb17..89ba0d4 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -17,8 +17,8 @@
*/
package org.apache.flink.table.functions
-import org.apache.commons.codec.digest.DigestUtils
-import
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.serialize
+import org.apache.flink.table.utils.EncodingUtils
+
/**
* Base class for all user-defined functions such as scalar functions, table
functions,
* or aggregation functions.
@@ -49,7 +49,7 @@ abstract class UserDefinedFunction extends Serializable {
def isDeterministic: Boolean = true
final def functionIdentifier: String = {
- val md5 = DigestUtils.md5Hex(serialize(this))
+ val md5 =
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 9799e4d..c9a2703 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,28 +19,27 @@
package org.apache.flink.table.functions.utils
-import java.util
-import java.lang.{Integer => JInt, Long => JLong}
import java.lang.reflect.{Method, Modifier}
+import java.lang.{Integer => JInt, Long => JLong}
import java.sql.{Date, Time, Timestamp}
+import java.util
-import org.apache.commons.codec.binary.Base64
import com.google.common.primitives.Primitives
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.{SqlCallBinding, SqlFunction,
SqlOperandCountRange, SqlOperator, SqlOperatorBinding}
+import org.apache.calcite.sql.{SqlCallBinding, SqlFunction,
SqlOperandCountRange, SqlOperator}
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo,
TypeExtractor}
import org.apache.flink.table.api.dataview._
-import org.apache.flink.table.dataview._
-import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.api.{TableEnvironment, TableException,
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataview._
import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction,
TableFunction, UserDefinedFunction}
+import org.apache.flink.table.plan.logical._
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
import org.apache.flink.util.InstantiationUtil
@@ -734,19 +733,6 @@ object UserDefinedFunctionUtils {
(candidate.getComponentType == expected.getComponentType ||
expected.getComponentType == classOf[Object]))
- @throws[Exception]
- def serialize(function: UserDefinedFunction): String = {
- val byteArray = InstantiationUtil.serializeObject(function)
- Base64.encodeBase64URLSafeString(byteArray)
- }
-
- @throws[Exception]
- def deserialize(data: String): UserDefinedFunction = {
- val byteData = Base64.decodeBase64(data)
- InstantiationUtil
- .deserializeObject[UserDefinedFunction](byteData,
Thread.currentThread.getContextClassLoader)
- }
-
/**
* Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
*
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
index fdf45e7..097346a 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.plan
-import org.apache.commons.lang.ClassUtils
+import org.apache.flink.table.typeutils.TypeCheckUtils
/**
* Generic base class for trees that can be transformed and traversed.
@@ -88,28 +88,28 @@ abstract class TreeNode[A <: TreeNode[A]] extends Product {
self: A =>
* if children change.
*/
private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
- val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0)
+ val ctors = getClass.getConstructors.filter(_.getParameterTypes.length > 0)
if (ctors.isEmpty) {
throw new RuntimeException(s"No valid constructor for
${getClass.getSimpleName}")
}
val defaultCtor = ctors.find { ctor =>
- if (ctor.getParameterTypes.size != newArgs.length) {
+ if (ctor.getParameterTypes.length != newArgs.length) {
false
} else if (newArgs.contains(null)) {
false
} else {
val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
- ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes)
+ TypeCheckUtils.isAssignable(argsClasses, ctor.getParameterTypes)
}
- }.getOrElse(ctors.maxBy(_.getParameterTypes.size))
+ }.getOrElse(ctors.maxBy(_.getParameterTypes.length))
try {
defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
} catch {
case e: Throwable =>
throw new RuntimeException(
- s"Fail to copy treeNode ${getClass.getName}:
${e.getStackTraceString}")
+ s"Fail to copy tree node ${getClass.getName}.", e)
}
}
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index da7cf64..a2acb0c 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -19,12 +19,9 @@ package org.apache.flink.table.runtime.functions
import java.lang.{StringBuilder, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
-import java.nio.charset.StandardCharsets
-import java.util.regex.Matcher
-import java.util.regex.Pattern
+import java.util.regex.{Matcher, Pattern}
-import org.apache.commons.codec.binary.{Base64, Hex}
-import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.utils.EncodingUtils
import scala.annotation.varargs
@@ -269,25 +266,25 @@ object ScalarFunctions {
/**
* Returns the base string decoded with base64.
*/
- def fromBase64(str: String): String =
- new String(Base64.decodeBase64(str), StandardCharsets.UTF_8)
+ def fromBase64(base64: String): String =
+ EncodingUtils.decodeBase64ToString(base64)
/**
* Returns the base64-encoded result of the input string.
*/
- def toBase64(base: String): String =
- Base64.encodeBase64String(base.getBytes(StandardCharsets.UTF_8))
+ def toBase64(string: String): String =
+ EncodingUtils.encodeStringToBase64(string)
/**
* Returns the hex string of a long argument.
*/
- def hex(x: Long): String = JLong.toHexString(x).toUpperCase()
+ def hex(string: Long): String = JLong.toHexString(string).toUpperCase()
/**
* Returns the hex string of a string argument.
*/
- def hex(x: String): String =
- Hex.encodeHexString(x.getBytes(StandardCharsets.UTF_8)).toUpperCase()
+ def hex(string: String): String =
+ EncodingUtils.hex(string).toUpperCase()
/**
* Returns an UUID string using Java utilities.
@@ -297,6 +294,6 @@ object ScalarFunctions {
/**
* Returns a string that repeats the base string n times.
*/
- def repeat(base: String, n: Int): String = StringUtils.repeat(base, n)
+ def repeat(base: String, n: Int): String = EncodingUtils.repeat(base, n)
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 02af798..25367ad 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -183,4 +183,95 @@ object TypeCheckUtils {
}
}
}
+
+ /**
+ * Checks if a class is a Java primitive wrapper.
+ */
+ def isPrimitiveWrapper(clazz: Class[_]): Boolean = {
+ clazz == classOf[java.lang.Boolean] ||
+ clazz == classOf[java.lang.Byte] ||
+ clazz == classOf[java.lang.Character] ||
+ clazz == classOf[java.lang.Short] ||
+ clazz == classOf[java.lang.Integer] ||
+ clazz == classOf[java.lang.Long] ||
+ clazz == classOf[java.lang.Double] ||
+ clazz == classOf[java.lang.Float]
+ }
+
+ /**
+ * Checks if one class can be assigned to a variable of another class.
+ *
+ * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class[],
java.lang.Class[])
+ * but without null checks.
+ */
+ def isAssignable(classArray: Array[Class[_]], toClassArray:
Array[Class[_]]): Boolean = {
+ if (classArray.length != toClassArray.length) {
+ return false
+ }
+ var i = 0
+ while (i < classArray.length) {
+ if (!isAssignable(classArray(i), toClassArray(i))) {
+ return false
+ }
+ i += 1
+ }
+ true
+ }
+
+ /**
+ * Checks if one class can be assigned to a variable of another class.
+ *
+ * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class,
java.lang.Class) but
+ * without null checks.
+ */
+ def isAssignable(cls: Class[_], toClass: Class[_]): Boolean = {
+ if (cls.equals(toClass)) {
+ return true
+ }
+ if (cls.isPrimitive) {
+ if (!toClass.isPrimitive) {
+ return false
+ }
+ if (java.lang.Integer.TYPE.equals(cls)) {
+ return java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Long.TYPE.equals(cls)) {
+ return java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Boolean.TYPE.equals(cls)) {
+ return false
+ }
+ if (java.lang.Double.TYPE.equals(cls)) {
+ return false
+ }
+ if (java.lang.Float.TYPE.equals(cls)) {
+ return java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Character.TYPE.equals(cls)) {
+ return java.lang.Integer.TYPE.equals(toClass) ||
+ java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Short.TYPE.equals(cls)) {
+ return java.lang.Integer.TYPE.equals(toClass) ||
+ java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Byte.TYPE.equals(cls)) {
+ return java.lang.Short.TYPE.equals(toClass) ||
+ java.lang.Integer.TYPE.equals(toClass) ||
+ java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ // should never get here
+ return false
+ }
+ toClass.isAssignableFrom(cls)
+ }
}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 28b7d14..f70d991 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -30,12 +30,12 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness,
TestHarnessUtil}
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{AggregateFunction,
UserDefinedFunction}
import
org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFunction,
LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
import
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
import
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
RowResultSortComparatorWithWatermarks}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.utils.EncodingUtils
import org.junit.Rule
import org.junit.rules.ExpectedException
@@ -47,13 +47,13 @@ class HarnessTestBase {
def thrown = expectedException
val longMinWithRetractAggFunction: String =
- UserDefinedFunctionUtils.serialize(new LongMinWithRetractAggFunction)
+ EncodingUtils.encodeObjectToString(new LongMinWithRetractAggFunction)
val longMaxWithRetractAggFunction: String =
- UserDefinedFunctionUtils.serialize(new LongMaxWithRetractAggFunction)
+ EncodingUtils.encodeObjectToString(new LongMaxWithRetractAggFunction)
val intSumWithRetractAggFunction: String =
- UserDefinedFunctionUtils.serialize(new IntSumWithRetractAggFunction)
+ EncodingUtils.encodeObjectToString(new IntSumWithRetractAggFunction)
protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]](
LONG_TYPE_INFO,
@@ -97,12 +97,14 @@ class HarnessTestBase {
| public MinMaxAggregateHelper() throws Exception {
|
| fmin =
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
- | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- | .deserialize("$longMinWithRetractAggFunction");
+ | ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+ | "$longMinWithRetractAggFunction",
+ | ${classOf[UserDefinedFunction].getCanonicalName}.class);
|
| fmax =
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
- | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- | .deserialize("$longMaxWithRetractAggFunction");
+ | ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+ | "$longMaxWithRetractAggFunction",
+ | ${classOf[UserDefinedFunction].getCanonicalName}.class);
| }
|
| public void setAggregationResults(
@@ -220,8 +222,9 @@ class HarnessTestBase {
| public SumAggregationHelper() throws Exception {
|
|sum =
(org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction)
- |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
- |.deserialize("$intSumWithRetractAggFunction");
+ |${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+ | "$intSumWithRetractAggFunction",
+ | ${classOf[UserDefinedFunction].getCanonicalName}.class);
|}
|
| public final void setAggregationResults(
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
index 645e608..8f5ff54 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.{Types => ScalaTypes}
import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.junit.Assert.{assertFalse, assertTrue}
import org.junit.Test
class TypeCheckUtilsTest {
@@ -51,4 +52,24 @@ class TypeCheckUtilsTest {
def testInvalidType3(): Unit = {
validateEqualsHashCode("", Types.OBJECT_ARRAY[Nothing](ScalaTypes.NOTHING))
}
+
+ @Test
+ def testPrimitiveWrapper (): Unit = {
+ assertTrue(TypeCheckUtils.isPrimitiveWrapper(classOf[java.lang.Double]))
+ assertFalse(TypeCheckUtils.isPrimitiveWrapper(classOf[Double]))
+ }
+
+ @Test
+ def testAssignability(): Unit = {
+ assertTrue(TypeCheckUtils.isAssignable(classOf[Double], classOf[Double]))
+ assertFalse(TypeCheckUtils.isAssignable(classOf[Boolean], classOf[Double]))
+ assertTrue(TypeCheckUtils.isAssignable(
+ classOf[java.util.HashMap[_, _]], classOf[java.util.Map[_, _]]))
+ assertFalse(TypeCheckUtils.isAssignable(
+ classOf[java.util.Map[_, _]], classOf[java.util.HashMap[_, _]]))
+
+ assertTrue(TypeCheckUtils.isAssignable(
+ Array[Class[_]](classOf[Double], classOf[Double]),
+ Array[Class[_]](classOf[Double], classOf[Double])))
+ }
}