This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8b8150b48d5c35f1b294b5d8f27ee5bba8fead5
Author: Timo Walther <[email protected]>
AuthorDate: Mon May 18 13:03:59 2020 +0200

    [hotfix][table] Update FLIP-65 functions to new data structure converters
---
 .../flink/table/types/utils/DataTypeUtils.java     |  10 ++
 .../flink/table/types/utils/DataTypeUtilsTest.java |  12 ++
 .../flink/table/planner/codegen/CodeGenUtils.scala | 134 ++++++++++++++++++++-
 .../planner/codegen/CodeGeneratorContext.scala     |  33 ++++-
 .../table/planner/codegen/ExpressionReducer.scala  |   9 ++
 .../codegen/calls/BridgingSqlFunctionCallGen.scala |  30 ++---
 .../conversion/ArrayBooleanArrayConverter.java     |   2 +-
 .../data/conversion/ArrayByteArrayConverter.java   |   2 +-
 .../data/conversion/ArrayDoubleArrayConverter.java |   2 +-
 .../data/conversion/ArrayFloatArrayConverter.java  |   2 +-
 .../data/conversion/ArrayIntArrayConverter.java    |   2 +-
 .../data/conversion/ArrayLongArrayConverter.java   |   2 +-
 .../data/conversion/ArrayObjectArrayConverter.java |   2 +-
 .../data/conversion/ArrayShortArrayConverter.java  |   2 +-
 .../table/data/conversion/DateDateConverter.java   |   2 +-
 .../data/conversion/DateLocalDateConverter.java    |   2 +-
 .../DayTimeIntervalDurationConverter.java          |   2 +-
 .../conversion/DecimalBigDecimalConverter.java     |   2 +-
 .../table/data/conversion/IdentityConverter.java   |   2 +-
 .../LocalZonedTimestampInstantConverter.java       |   2 +-
 .../LocalZonedTimestampIntConverter.java           |   2 +-
 .../LocalZonedTimestampLongConverter.java          |   2 +-
 .../table/data/conversion/MapMapConverter.java     |   2 +-
 .../data/conversion/RawByteArrayConverter.java     |   2 +-
 .../table/data/conversion/RawObjectConverter.java  |   2 +-
 .../table/data/conversion/RowRowConverter.java     |   2 +-
 .../data/conversion/StringByteArrayConverter.java  |   2 +-
 .../data/conversion/StringStringConverter.java     |   2 +-
 .../data/conversion/StructuredObjectConverter.java |   2 +-
 .../data/conversion/TimeLocalTimeConverter.java    |   2 +-
 .../table/data/conversion/TimeLongConverter.java   |   2 +-
 .../table/data/conversion/TimeTimeConverter.java   |   2 +-
 .../TimestampLocalDateTimeConverter.java           |   2 +-
 .../conversion/TimestampTimestampConverter.java    |   2 +-
 .../YearMonthIntervalPeriodConverter.java          |   2 +-
 35 files changed, 233 insertions(+), 53 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 84c8242..b0c74d1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -47,7 +47,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.table.types.extraction.ExtractionUtils.primitiveToWrapper;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass;
 
 /**
  * Utilities for handling {@link DataType}s.
@@ -56,6 +58,14 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFi
 public final class DataTypeUtils {
 
        /**
+        * Checks whether a given data type is an internal data structure.
+        */
+       public static boolean isInternal(DataType dataType) {
+               final Class<?> clazz = 
primitiveToWrapper(dataType.getConversionClass());
+               return clazz == 
toInternalConversionClass(dataType.getLogicalType());
+       }
+
+       /**
         * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it 
keeps the bridging class.
         */
        public static DataType replaceLogicalType(DataType dataType, 
LogicalType replacement) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
index c650a60..07edff7 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.table.types.logical.DistinctType;
@@ -42,12 +43,23 @@ import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link DataTypeUtils}.
  */
 public class DataTypeUtilsTest {
+
+       @Test
+       public void testIsInternalClass() {
+               assertTrue(DataTypeUtils.isInternal(DataTypes.INT()));
+               
assertTrue(DataTypeUtils.isInternal(DataTypes.INT().notNull().bridgedTo(int.class)));
+               
assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));
+               assertFalse(DataTypeUtils.isInternal(DataTypes.ROW()));
+       }
+
        @Test
        public void testExpandRowType() {
                DataType dataType = ROW(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 6e62a3f..786d573 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -27,6 +27,7 @@ import org.apache.flink.core.memory.MemorySegment
 import org.apache.flink.table.data._
 import 
org.apache.flink.table.data.binary.BinaryRowDataUtil.BYTE_ARRAY_BASE_OFFSET
 import org.apache.flink.table.data.binary._
+import org.apache.flink.table.data.conversion.DataStructureConverters
 import org.apache.flink.table.data.util.DataFormatConverters
 import org.apache.flink.table.data.util.DataFormatConverters.IdentityConverter
 import org.apache.flink.table.functions.UserDefinedFunction
@@ -42,6 +43,8 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, 
getPrecision, getScale, hasRoot}
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalConversionClass
+import org.apache.flink.table.types.utils.DataTypeUtils
+import org.apache.flink.table.types.utils.DataTypeUtils.isInternal
 import org.apache.flink.types.{Row, RowKind}
 
 import scala.annotation.tailrec
@@ -776,9 +779,118 @@ object CodeGenUtils {
   // -------------------------- Data Structure Conversion  
-------------------------------
 
   /**
+   * Generates code for converting the given term of external data type to an 
internal data
+   * structure.
+   *
+   * Use this function for converting at the edges of the API where primitive 
types CAN NOT occur
+   * and NO NULL CHECKING is required as it might have been done by 
surrounding layers.
+   */
+  def genToInternalConverter(
+      ctx: CodeGeneratorContext,
+      sourceDataType: DataType)
+    : String => String = {
+    if (isInternal(sourceDataType)) {
+      externalTerm => s"$externalTerm"
+    } else {
+      val converter = DataStructureConverters.getConverter(sourceDataType)
+      val internalTypeTerm = 
boxedTypeTermForType(sourceDataType.getLogicalType)
+      val externalTypeTerm = typeTerm(sourceDataType.getConversionClass)
+      val converterTerm = ctx.addReusableConverter(converter)
+      externalTerm =>
+        s"($internalTypeTerm) 
$converterTerm.toInternalOrNull(($externalTypeTerm) $externalTerm)"
+    }
+  }
+
+  /**
+   * Generates code for converting the given term of external data type to an 
internal data
+   * structure.
+   *
+   * Use this function for converting at the edges of the API where PRIMITIVE 
TYPES can occur or
+   * the RESULT CAN BE NULL.
+   */
+  def genToInternalConverterAll(
+      ctx: CodeGeneratorContext,
+      sourceDataType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    val sourceType = sourceDataType.getLogicalType
+    val sourceClass = sourceDataType.getConversionClass
+    // convert external source type to internal structure
+    val internalResultTerm = if (isInternal(sourceDataType)) {
+      s"$externalTerm"
+    } else {
+      genToInternalConverter(ctx, sourceDataType)(externalTerm)
+    }
+    // extract null term from result term
+    if (sourceClass.isPrimitive) {
+      generateNonNullField(sourceType, internalResultTerm)
+    } else {
+      generateInputFieldUnboxing(ctx, sourceType, externalTerm, 
internalResultTerm)
+    }
+  }
+
+  /**
+   * Generates code for converting the given term of internal data structure 
to the given
+   * external target data type.
+   *
+   * Use this function for converting at the edges of the API where primitive 
types CAN NOT occur
+   * and NO NULL CHECKING is required as it might have been done by 
surrounding layers.
+   */
+  def genToExternalConverter(
+      ctx: CodeGeneratorContext,
+      targetDataType: DataType,
+      internalTerm: String)
+    : String = {
+    if (isInternal(targetDataType)) {
+      s"$internalTerm"
+    } else {
+      val converter = DataStructureConverters.getConverter(targetDataType)
+      val internalTypeTerm = 
boxedTypeTermForType(targetDataType.getLogicalType)
+      val externalTypeTerm = typeTerm(targetDataType.getConversionClass)
+      val converterTerm = ctx.addReusableConverter(converter)
+      s"($externalTypeTerm) $converterTerm.toExternal(($internalTypeTerm) 
$internalTerm)"
+    }
+  }
+
+  /**
+   * Generates code for converting the given expression of internal data 
structure to the given
+   * external target data type.
+   *
+   * Use this function for converting at the edges of the API where PRIMITIVE 
TYPES can occur or
+   * the RESULT CAN BE NULL.
+   */
+  def genToExternalConverterAll(
+      ctx: CodeGeneratorContext,
+      targetDataType: DataType,
+      internalExpr: GeneratedExpression)
+    : String = {
+    val targetType = targetDataType.getLogicalType
+    val targetTypeTerm = boxedTypeTermForType(targetType)
+
+    // untyped null literal
+    if (hasRoot(internalExpr.resultType, NULL)) {
+      return s"($targetTypeTerm) null"
+    }
+
+    // convert internal structure to target type
+    val externalResultTerm = if (isInternal(targetDataType)) {
+      s"($targetTypeTerm) ${internalExpr.resultTerm}"
+    } else {
+      genToExternalConverter(ctx, targetDataType, internalExpr.resultTerm)
+    }
+    // merge null term into the result term
+    if (targetDataType.getConversionClass.isPrimitive) {
+      externalResultTerm
+    } else {
+      s"${internalExpr.nullTerm} ? null : ($externalResultTerm)"
+    }
+  }
+
+  /**
     * If it's internally compatible, don't need to DataStructure converter.
     * clazz != classOf[Row] => Row can only infer GenericType[Row].
     */
+  @deprecated
   def isInternalClass(t: DataType): Boolean = {
     val clazz = t.getConversionClass
     clazz != classOf[Object] && clazz != classOf[Row] &&
@@ -786,10 +898,15 @@ object CodeGenUtils {
             clazz == toInternalConversionClass(fromDataTypeToLogicalType(t)))
   }
 
+  @deprecated
   private def isConverterIdentity(t: DataType): Boolean = {
     
DataFormatConverters.getConverterForDataType(t).isInstanceOf[IdentityConverter[_]]
   }
 
+  /**
+   * @deprecated This uses the legacy [[DataFormatConverters]] including 
legacy types.
+   */
+  @deprecated
   def genToInternal(ctx: CodeGeneratorContext, t: DataType, term: String): 
String =
     genToInternal(ctx, t)(term)
 
@@ -798,7 +915,10 @@ object CodeGenUtils {
    *
    * Use this function for converting at the edges of the API where primitive 
types CAN NOT occur
    * and NO NULL CHECKING is required as it might have been done by 
surrounding layers.
+   *
+   * @deprecated This uses the legacy [[DataFormatConverters]] including 
legacy types.
    */
+  @deprecated
   def genToInternal(ctx: CodeGeneratorContext, t: DataType): String => String 
= {
     if (isConverterIdentity(t)) {
       term => s"$term"
@@ -813,11 +933,10 @@ object CodeGenUtils {
   }
 
   /**
-   * Generates code for converting the given external source data type to the 
internal data format.
    *
-   * Use this function for converting at the edges of the API where PRIMITIVE 
TYPES can occur or
-   * the RESULT CAN BE NULL.
+   * @deprecated This uses the legacy [[DataFormatConverters]] including 
legacy types.
    */
+  @deprecated
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
       sourceDataType: DataType,
@@ -839,6 +958,10 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * @deprecated This uses the legacy [[DataFormatConverters]] including 
legacy types.
+   */
+  @deprecated
   def genToExternal(
       ctx: CodeGeneratorContext,
       targetType: DataType,
@@ -856,10 +979,9 @@ object CodeGenUtils {
   }
 
   /**
-   * Generates code for converting the internal data format to the given 
external target data type.
-   *
-   * Use this function for converting at the edges of the API.
+   * @deprecated This uses the legacy [[DataFormatConverters]] including 
legacy types.
    */
+  @deprecated
   def genToExternalIfNeeded(
       ctx: CodeGeneratorContext,
       targetDataType: DataType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index d15d2e5..5ef90a2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -32,11 +32,11 @@ import org.apache.flink.table.runtime.util.collections._
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
 import org.apache.flink.util.InstantiationUtil
-
 import org.apache.calcite.avatica.util.DateTimeUtils
-
 import java.util.TimeZone
 
+import org.apache.flink.table.data.conversion.DataStructureConverter
+
 import scala.collection.mutable
 
 /**
@@ -665,6 +665,33 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   }
 
   /**
+   * Adds a reusable [[DataStructureConverter]] to the member area of the 
generated class.
+   *
+   * @param converter converter to be added
+   * @param classLoaderTerm term to access the [[ClassLoader]] for 
user-defined classes
+   */
+  def addReusableConverter(
+      converter: DataStructureConverter[_, _],
+      classLoaderTerm: String = null)
+    : String = {
+
+    val converterTerm = addReusableObject(converter, "converter")
+
+    val openConverter = if (classLoaderTerm != null) {
+      s"""
+         |$converterTerm.open($classLoaderTerm);
+       """.stripMargin
+    } else {
+      s"""
+         |$converterTerm.open(getRuntimeContext().getUserCodeClassLoader());
+       """.stripMargin
+    }
+    reusableOpenStatements.add(openConverter)
+
+    converterTerm
+  }
+
+  /**
     * Adds a reusable [[TypeSerializer]] to the member area of the generated 
class.
     *
     * @param t the internal type which used to generate internal type 
serializer
@@ -678,7 +705,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 
       case None =>
         val term = newName("typeSerializer")
-        val ser = InternalSerializers.create(t, new ExecutionConfig)
+        val ser = InternalSerializers.create(t)
         addReusableObjectInternal(ser, term, ser.getClass.getCanonicalName)
         reusableTypeSerializers(t) = term
         term
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index 82a0122..46d151f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -35,6 +35,8 @@ import org.apache.calcite.rex.{RexBuilder, RexExecutor, 
RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
 import java.io.File
 
+import org.apache.flink.table.data.conversion.DataStructureConverter
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
@@ -275,4 +277,11 @@ class ConstantCodeGeneratorContext(tableConfig: 
TableConfig)
       runtimeContextTerm: String = null): String = {
     super.addReusableFunction(function, classOf[ConstantFunctionContext], 
"parameters")
   }
+
+  override def addReusableConverter(
+      converter: DataStructureConverter[_, _],
+      classLoaderTerm: String = null)
+    : String = {
+    super.addReusableConverter(converter, "this.getClass().getClassLoader()")
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
index a15ac35..abfaffb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
@@ -18,10 +18,14 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
+import java.lang.reflect.Method
+import java.util.Collections
+
+import org.apache.calcite.rex.{RexCall, RexCallBinding}
 import org.apache.flink.table.data.GenericRowData
 import 
org.apache.flink.table.functions.UserDefinedFunctionHelper.{SCALAR_EVAL, 
TABLE_EVAL}
 import org.apache.flink.table.functions.{FunctionKind, ScalarFunction, 
TableFunction, UserDefinedFunction}
-import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{genToExternalIfNeeded, 
genToInternalIfNeeded, newName, typeTerm}
+import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.GeneratedExpression.NEVER_NULL
 import org.apache.flink.table.planner.codegen._
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
@@ -29,17 +33,13 @@ import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallCon
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.collector.WrappingCollector
 import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.extraction.ExtractionUtils
 import 
org.apache.flink.table.types.extraction.ExtractionUtils.{createMethodSignatureString,
 isAssignable, isInvokable, primitiveToWrapper}
 import org.apache.flink.table.types.inference.TypeInferenceUtil
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot, 
isCompositeType}
 import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, 
RowType}
 import org.apache.flink.util.Preconditions
-import org.apache.calcite.rex.{RexCall, RexCallBinding}
-import java.lang.reflect.Method
-import java.util.Collections
-
-import org.apache.flink.table.types.extraction.ExtractionUtils
 
 /**
  * Generates a call to a user-defined [[ScalarFunction]] or [[TableFunction]].
@@ -110,15 +110,15 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
 
     if (function.getDefinition.getKind == FunctionKind.TABLE) {
       Preconditions.checkState(
-        hasRoot(returnType, LogicalTypeRoot.ROW),
-        "Logical output type of function call should be a ROW type.",
+        isCompositeType(returnType),
+        "Logical output type of function call should be a composite type.",
         Seq(): _*)
       generateTableFunctionCall(
         ctx,
         functionTerm,
         externalOperands,
         outputDataType,
-        returnType.asInstanceOf[RowType])
+        returnType)
     } else {
       generateScalarFunctionCall(ctx, functionTerm, externalOperands, 
outputDataType)
     }
@@ -129,7 +129,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
       functionTerm: String,
       externalOperands: Seq[GeneratedExpression],
       functionOutputDataType: DataType,
-      outputType: RowType)
+      outputType: LogicalType)
     : GeneratedExpression = {
     val resultCollectorTerm = generateResultCollector(ctx, 
functionOutputDataType, outputType)
 
@@ -160,7 +160,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
   def generateResultCollector(
       ctx: CodeGeneratorContext,
       outputDataType: DataType,
-      returnType: RowType)
+      returnType: LogicalType)
     : String = {
     val outputType = outputDataType.getLogicalType
 
@@ -172,7 +172,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
       val resultGenerator = new ExprCodeGenerator(collectorCtx, 
outputType.isNullable)
         .bindInput(outputType, externalResultTerm)
       val wrappedResult = resultGenerator.generateConverterResultExpression(
-        returnType,
+        returnType.asInstanceOf[RowType],
         classOf[GenericRowData])
       s"""
        |${wrappedResult.code}
@@ -193,7 +193,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
       outputType,
       externalResultTerm,
       // nullability is handled by the expression code generator if necessary
-      CodeGenUtils.genToInternal(ctx, outputDataType),
+      genToInternalConverter(ctx, outputDataType),
       collectorCode)
     val resultCollectorTerm = newName("resultConverterCollector")
     CollectorCodeGenerator.addToContext(ctx, resultCollectorTerm, 
resultCollector)
@@ -221,7 +221,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
       s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})"
     }
     val externalResultTerm = 
ctx.addReusableLocalVariable(externalResultTypeTerm, "externalResult")
-    val internalExpr = genToInternalIfNeeded(ctx, outputDataType, 
externalResultTerm)
+    val internalExpr = genToInternalConverterAll(ctx, outputDataType, 
externalResultTerm)
 
     // function call
     internalExpr.copy(code =
@@ -241,7 +241,7 @@ class BridgingSqlFunctionCallGen(call: RexCall) extends 
CallGenerator {
     operands
       .zip(argumentDataTypes)
       .map { case (operand, dataType) =>
-        operand.copy(resultTerm = genToExternalIfNeeded(ctx, dataType, 
operand))
+        operand.copy(resultTerm = genToExternalConverterAll(ctx, dataType, 
operand))
       }
   }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
index 3939413..e5eb438 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayBooleanArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code boolean[]} external type.
  */
 @Internal
-class ArrayBooleanArrayConverter implements DataStructureConverter<ArrayData, 
boolean[]> {
+public class ArrayBooleanArrayConverter implements 
DataStructureConverter<ArrayData, boolean[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
index 6c8b665..05baa77 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayByteArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code byte[]} external type.
  */
 @Internal
-class ArrayByteArrayConverter implements DataStructureConverter<ArrayData, 
byte[]> {
+public class ArrayByteArrayConverter implements 
DataStructureConverter<ArrayData, byte[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
index b442ff9..f013271 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayDoubleArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code double[]} external type.
  */
 @Internal
-class ArrayDoubleArrayConverter implements DataStructureConverter<ArrayData, 
double[]> {
+public class ArrayDoubleArrayConverter implements 
DataStructureConverter<ArrayData, double[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
index 3b8bf15..90be3de 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayFloatArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code float[]} external type.
  */
 @Internal
-class ArrayFloatArrayConverter implements DataStructureConverter<ArrayData, 
float[]> {
+public class ArrayFloatArrayConverter implements 
DataStructureConverter<ArrayData, float[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
index fe8880a..5622965 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayIntArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code int[]} external type.
  */
 @Internal
-class ArrayIntArrayConverter implements DataStructureConverter<ArrayData, 
int[]> {
+public class ArrayIntArrayConverter implements 
DataStructureConverter<ArrayData, int[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
index 963d146..c495355 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayLongArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code long[]} external type.
  */
 @Internal
-class ArrayLongArrayConverter implements DataStructureConverter<ArrayData, 
long[]> {
+public class ArrayLongArrayConverter implements 
DataStructureConverter<ArrayData, long[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index 761d758..5049064 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -39,7 +39,7 @@ import java.lang.reflect.Array;
  */
 @Internal
 @SuppressWarnings("unchecked")
-class ArrayObjectArrayConverter<E> implements 
DataStructureConverter<ArrayData, E[]> {
+public class ArrayObjectArrayConverter<E> implements 
DataStructureConverter<ArrayData, E[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
index 3b48ea4..7d536d8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayShortArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.ArrayType;
  * Converter for {@link ArrayType} of {@code short[]} external type.
  */
 @Internal
-class ArrayShortArrayConverter implements DataStructureConverter<ArrayData, 
short[]> {
+public class ArrayShortArrayConverter implements 
DataStructureConverter<ArrayData, short[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
index e980891..886591d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateDateConverter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.DateType;
  * Converter for {@link DateType} of {@link java.sql.Date} external type.
  */
 @Internal
-class DateDateConverter implements DataStructureConverter<Integer, 
java.sql.Date> {
+public class DateDateConverter implements DataStructureConverter<Integer, 
java.sql.Date> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
index 6df885d..d707d39 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DateLocalDateConverter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.DateType;
  * Converter for {@link DateType} of {@link java.time.LocalDate} external type.
  */
 @Internal
-class DateLocalDateConverter implements DataStructureConverter<Integer, 
java.time.LocalDate> {
+public class DateLocalDateConverter implements DataStructureConverter<Integer, 
java.time.LocalDate> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
index 1ea01c5..d3da668 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DayTimeIntervalDurationConverter.java
@@ -27,7 +27,7 @@ import java.time.Duration;
  * Converter for {@link DayTimeIntervalType} of {@link java.time.Duration} 
external type.
  */
 @Internal
-class DayTimeIntervalDurationConverter implements DataStructureConverter<Long, 
java.time.Duration> {
+public class DayTimeIntervalDurationConverter implements 
DataStructureConverter<Long, java.time.Duration> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
index 850b6a0..76832a4 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DecimalBigDecimalConverter.java
@@ -29,7 +29,7 @@ import java.math.BigDecimal;
  * Converter for {@link DecimalType} of {@link BigDecimal} external type.
  */
 @Internal
-class DecimalBigDecimalConverter implements 
DataStructureConverter<DecimalData, BigDecimal> {
+public class DecimalBigDecimalConverter implements 
DataStructureConverter<DecimalData, BigDecimal> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
index 8d9c874..d274ae8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/IdentityConverter.java
@@ -24,7 +24,7 @@ import org.apache.flink.annotation.Internal;
  * No-op converter that just forwards its input.
  */
 @Internal
-class IdentityConverter<I> implements DataStructureConverter<I, I> {
+public class IdentityConverter<I> implements DataStructureConverter<I, I> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
index fffefbe..c8921b7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampInstantConverter.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.table.types.logical.LocalZonedTimestampType;
  * Converter for {@link LocalZonedTimestampType} of {@link java.time.Instant} 
external type.
  */
 @Internal
-class LocalZonedTimestampInstantConverter implements 
DataStructureConverter<TimestampData, java.time.Instant> {
+public class LocalZonedTimestampInstantConverter implements 
DataStructureConverter<TimestampData, java.time.Instant> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
index 349d34e..35fd54c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampIntConverter.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.table.types.logical.LocalZonedTimestampType;
  * Converter for {@link LocalZonedTimestampType} of {@link Integer} external 
type.
  */
 @Internal
-class LocalZonedTimestampIntConverter implements 
DataStructureConverter<TimestampData, Integer> {
+public class LocalZonedTimestampIntConverter implements 
DataStructureConverter<TimestampData, Integer> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
index 6ddb7eb..0281a51 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/LocalZonedTimestampLongConverter.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.table.types.logical.LocalZonedTimestampType;
  * Converter for {@link LocalZonedTimestampType} of {@link Long} external type.
  */
 @Internal
-class LocalZonedTimestampLongConverter implements 
DataStructureConverter<TimestampData, Long> {
+public class LocalZonedTimestampLongConverter implements 
DataStructureConverter<TimestampData, Long> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
index 24131b0..406036c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/MapMapConverter.java
@@ -35,7 +35,7 @@ import java.util.Map;
  * Converter for {@link MapType}/{@link MultisetType} of {@link Map} external 
type.
  */
 @Internal
-class MapMapConverter<K, V> implements DataStructureConverter<MapData, Map<K, 
V>> {
+public class MapMapConverter<K, V> implements DataStructureConverter<MapData, 
Map<K, V>> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
index db24407..7ce245e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawByteArrayConverter.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.types.logical.RawType;
  * Converter for {@link RawType} of {@code byte[]} external type.
  */
 @Internal
-class RawByteArrayConverter<T> implements 
DataStructureConverter<RawValueData<T>, byte[]> {
+public class RawByteArrayConverter<T> implements 
DataStructureConverter<RawValueData<T>, byte[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
index d93756a..db3bafb 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RawObjectConverter.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.types.logical.RawType;
  * Converter for {@link RawType} of object external type.
  */
 @Internal
-class RawObjectConverter<T> implements DataStructureConverter<RawValueData<T>, 
T> {
+public class RawObjectConverter<T> implements 
DataStructureConverter<RawValueData<T>, T> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
index c899ff3..b03197b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
@@ -32,7 +32,7 @@ import java.util.stream.IntStream;
  * Converter for {@link RowType} of {@link Row} external type.
  */
 @Internal
-class RowRowConverter implements DataStructureConverter<RowData, Row> {
+public class RowRowConverter implements DataStructureConverter<RowData, Row> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
index 3a7736c..94de7e6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringByteArrayConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.VarCharType;
  * Converter for {@link CharType}/{@link VarCharType} of {@code byte[]} 
external type.
  */
 @Internal
-class StringByteArrayConverter implements DataStructureConverter<StringData, 
byte[]> {
+public class StringByteArrayConverter implements 
DataStructureConverter<StringData, byte[]> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
index 290758b..1aed58b 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StringStringConverter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.VarCharType;
  * Converter for {@link CharType}/{@link VarCharType} of {@link String} 
external type.
  */
 @Internal
-class StringStringConverter implements DataStructureConverter<StringData, 
String> {
+public class StringStringConverter implements 
DataStructureConverter<StringData, String> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
index 2d21db8..aa9ba95 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/StructuredObjectConverter.java
@@ -46,7 +46,7 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFi
  */
 @Internal
 @SuppressWarnings("unchecked")
-class StructuredObjectConverter<T> implements DataStructureConverter<RowData, 
T> {
+public class StructuredObjectConverter<T> implements 
DataStructureConverter<RowData, T> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
index d418f25..77c6196 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLocalTimeConverter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.TimeType;
  * Converter for {@link TimeType} of {@link java.time.LocalTime} external type.
  */
 @Internal
-class TimeLocalTimeConverter implements DataStructureConverter<Integer, 
java.time.LocalTime> {
+public class TimeLocalTimeConverter implements DataStructureConverter<Integer, 
java.time.LocalTime> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
index 6cc79fb..d84dcfc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeLongConverter.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.types.logical.TimeType;
  * Converter for {@link TimeType} of {@link Long} external type.
  */
 @Internal
-class TimeLongConverter implements DataStructureConverter<Integer, Long> {
+public class TimeLongConverter implements DataStructureConverter<Integer, 
Long> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
index 1c8b34a..33293ea 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimeTimeConverter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.TimeType;
  * Converter for {@link TimeType} of {@link java.sql.Time} external type.
  */
 @Internal
-class TimeTimeConverter implements DataStructureConverter<Integer, 
java.sql.Time> {
+public class TimeTimeConverter implements DataStructureConverter<Integer, 
java.sql.Time> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
index c156715..bce2bf7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampLocalDateTimeConverter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.TimestampType;
  * Converter for {@link TimestampType} of {@link java.time.LocalDateTime} 
external type.
  */
 @Internal
-class TimestampLocalDateTimeConverter implements 
DataStructureConverter<TimestampData, java.time.LocalDateTime> {
+public class TimestampLocalDateTimeConverter implements 
DataStructureConverter<TimestampData, java.time.LocalDateTime> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
index f9a72f0..66a4fcb 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/TimestampTimestampConverter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.TimestampType;
  * Converter for {@link TimestampType} of {@link java.sql.Timestamp} external 
type.
  */
 @Internal
-class TimestampTimestampConverter implements 
DataStructureConverter<TimestampData, java.sql.Timestamp> {
+public class TimestampTimestampConverter implements 
DataStructureConverter<TimestampData, java.sql.Timestamp> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
index 50f8d01..ee871c8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/YearMonthIntervalPeriodConverter.java
@@ -30,7 +30,7 @@ import java.time.Period;
  * Converter for {@link YearMonthIntervalType} of {@link java.time.Period} 
external type.
  */
 @Internal
-class YearMonthIntervalPeriodConverter implements 
DataStructureConverter<Integer, java.time.Period> {
+public class YearMonthIntervalPeriodConverter implements 
DataStructureConverter<Integer, java.time.Period> {
 
        private static final long serialVersionUID = 1L;
 

Reply via email to