This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a332f8f51cfaf6fc3c4d88719ab000eef548e14f Author: Marios Trivyzas <[email protected]> AuthorDate: Fri Dec 3 10:23:30 2021 +0100 [FLINK-25111][table-api][table-planner] Add config option to determine CAST behaviour Add a new `ExecutionConfigOption`, so that users can choose between the legacy behaviour of CAST or the new one, including improvements and fixes. This closes #17985. --- .../generated/execution_config_configuration.html | 5 ++++ .../functions/AdvancedFunctionsExampleITCase.java | 18 ++++++------ .../table/api/config/ExecutionConfigOptions.java | 34 ++++++++++++++++++++++ .../planner/connectors/CollectDynamicSink.java | 11 +++++-- .../table/planner/connectors/DynamicSinkUtils.java | 6 +++- .../casting/AbstractCodeGeneratorCastRule.java | 5 ++++ .../AbstractExpressionCodeGeneratorCastRule.java | 5 ++++ .../table/planner/functions/casting/CastRule.java | 10 ++++++- .../functions/casting/CodeGeneratorCastRule.java | 4 +++ .../casting/RowDataToStringConverterImpl.java | 8 +++-- .../functions/casting/RowToStringCastRule.java | 13 ++++++++- .../flink/table/planner/codegen/CodeGenUtils.scala | 2 ++ .../planner/codegen/calls/ScalarOperatorGens.scala | 8 +++++ .../planner/functions/casting/CastRulesTest.java | 18 ++++++++++-- .../expressions/utils/ExpressionTestBase.scala | 6 ++++ 15 files changed, 133 insertions(+), 20 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index e0c9c53..e809eb0 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -58,6 +58,11 @@ By default no operator is disabled.</td> <td><p>Enum</p></td> <td>Determines whether string values for columns with CHAR(<precision>)/VARCHAR(<precision>) types will be trimmed or padded (only for CHAR(<precision>)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match [...] </tr> + <td><h5>table.exec.sink.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">ENABLED</td> + <td><p>Enum</p></td> + <td>Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.<br /><br />Possible values:<ul><li>"ENABLED": CAST will operate following the legacy behaviour.</li><li>"DISABLED": CAST will operate following the new correct behaviour.</li></ul></td> + </tr> <tr> <td><h5>table.exec.sink.not-null-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">ERROR</td> diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java index 3306911..4bc98c2 100644 --- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java +++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java @@ -41,41 +41,41 @@ public class AdvancedFunctionsExampleITCase extends ExampleOutputTestBase { assertThat( consoleOutput, containsString( - "| Guillermo Smith | (5, 2020-12-05) |")); + "| Guillermo Smith | (5,2020-12-05) |")); assertThat( consoleOutput, containsString( - "| John Turner | (12, 2020-10-02) |")); + "| John Turner | (12,2020-10-02) |")); assertThat( consoleOutput, containsString( - "| Brandy Sanders | (1, 2020-10-14) |")); + "| Brandy Sanders | (1,2020-10-14) |")); assertThat( consoleOutput, containsString( - "| Valeria Mendoza | (10, 2020-06-02) |")); + "| Valeria Mendoza | (10,2020-06-02) |")); assertThat( consoleOutput, containsString( - "| Ellen Ortega | (100, 2020-06-18) |")); + "| Ellen Ortega | (100,2020-06-18) |")); assertThat( consoleOutput, containsString( - "| Leann Holloway | (9, 2020-05-26) |")); + "| Leann Holloway | (9,2020-05-26) |")); } private void testExecuteInternalRowMergerFunction(String consoleOutput) { assertThat( consoleOutput, containsString( - "| Guillermo Smith | (1992-12-12, New Jersey, 81... |")); + "| Guillermo Smith | (1992-12-12,New Jersey,816-... |")); assertThat( consoleOutput, containsString( - "| Valeria Mendoza | (1970-03-28, Los Angeles, 9... |")); + "| Valeria Mendoza | (1970-03-28,Los Angeles,928... |")); assertThat( consoleOutput, containsString( - "| Leann Holloway | (1989-05-21, Eugene, 614-88... |")); + "| Leann Holloway | (1989-05-21,Eugene,614-889-... |")); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 5efa77f..0a8d417 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -386,6 +386,15 @@ public class ExecutionConfigOptions { + "Pipelined shuffle means data will be sent to consumer tasks once produced.") .build()); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR = + key("table.exec.sink.legacy-cast-behaviour") + .enumType(LegacyCastBehaviour.class) + .defaultValue(LegacyCastBehaviour.ENABLED) + .withDescription( + "Determines whether CAST will operate following the legacy behaviour " + + "or the new one that introduces various fixes and improvements."); + // ------------------------------------------------------------------------------------------ // Enum option types // ------------------------------------------------------------------------------------------ @@ -453,4 +462,29 @@ public class ExecutionConfigOptions { /** Add materialize operator in any case. */ FORCE } + + /** Determine if CAST operates using the legacy behaviour or the new one. */ + @Deprecated + public enum LegacyCastBehaviour implements DescribedEnum { + ENABLED(true, text("CAST will operate following the legacy behaviour.")), + DISABLED(false, text("CAST will operate following the new correct behaviour.")); + + private final boolean enabled; + private final InlineElement description; + + LegacyCastBehaviour(boolean enabled, InlineElement description) { + this.enabled = enabled; + this.description = description; + } + + @Internal + @Override + public InlineElement getDescription() { + return description; + } + + public boolean isEnabled() { + return enabled; + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java index 575fab7..98fcf8b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java @@ -57,6 +57,7 @@ final class CollectDynamicSink implements DynamicTableSink { private final Duration socketTimeout; private final ClassLoader classLoader; private final ZoneId sessionZoneId; + private final boolean legacyCastBehaviour; // mutable attributes private CollectResultIterator<RowData> iterator; @@ -68,18 +69,21 @@ final class CollectDynamicSink implements DynamicTableSink { MemorySize maxBatchSize, Duration socketTimeout, ClassLoader classLoader, - ZoneId sessionZoneId) { + ZoneId sessionZoneId, + boolean legacyCastBehaviour) { this.tableIdentifier = tableIdentifier; this.consumedDataType = consumedDataType; this.maxBatchSize = maxBatchSize; this.socketTimeout = socketTimeout; this.classLoader = classLoader; this.sessionZoneId = sessionZoneId; + this.legacyCastBehaviour = legacyCastBehaviour; } public ResultProvider getSelectResultProvider() { return new CollectResultProvider( - new RowDataToStringConverterImpl(consumedDataType, sessionZoneId, classLoader)); + new RowDataToStringConverterImpl( + consumedDataType, sessionZoneId, classLoader, legacyCastBehaviour)); } @Override @@ -132,7 +136,8 @@ final class CollectDynamicSink implements DynamicTableSink { maxBatchSize, socketTimeout, classLoader, - sessionZoneId); + sessionZoneId, + legacyCastBehaviour); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 87c432e..acb557a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; @@ -115,7 +116,10 @@ public final class DynamicSinkUtils { configuration.get(CollectSinkOperatorFactory.MAX_BATCH_SIZE), configuration.get(CollectSinkOperatorFactory.SOCKET_TIMEOUT), classLoader, - zoneId); + zoneId, + configuration + .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR) + .isEnabled()); collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider()); collectModifyOperation.setConsumedDataType(consumedDataType); return convertSinkToRel( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java index 9e068b8..209ee8d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java @@ -182,6 +182,11 @@ abstract class AbstractCodeGeneratorCastRule<IN, OUT> extends AbstractCastRule<I } @Override + public boolean legacyBehaviour() { + return castRuleCtx.legacyBehaviour(); + } + + @Override public String getSessionTimeZoneTerm() { return "java.util.TimeZone.getTimeZone(\"" + castRuleCtx.getSessionZoneId().getId() diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java index 52c0b85..a737678 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractExpressionCodeGeneratorCastRule.java @@ -104,6 +104,11 @@ abstract class AbstractExpressionCodeGeneratorCastRule<IN, OUT> CastRule.Context ctx) { return new CodeGeneratorCastRule.Context() { @Override + public boolean legacyBehaviour() { + return ctx.legacyBehaviour(); + } + + @Override public String getSessionTimeZoneTerm() { return "java.util.TimeZone.getTimeZone(\"" + ctx.getSessionZoneId().getId() + "\")"; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java index 58217e4..78d6f2d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java @@ -49,14 +49,22 @@ public interface CastRule<IN, OUT> { /** Casting context. */ interface Context { + @Deprecated + boolean legacyBehaviour(); + ZoneId getSessionZoneId(); ClassLoader getClassLoader(); /** Create a casting context. */ - static Context create(ZoneId zoneId, ClassLoader classLoader) { + static Context create(boolean legacyBehaviour, ZoneId zoneId, ClassLoader classLoader) { return new Context() { @Override + public boolean legacyBehaviour() { + return legacyBehaviour; + } + + @Override public ZoneId getSessionZoneId() { return zoneId; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java index 2f9de82..1b189fa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CodeGeneratorCastRule.java @@ -44,6 +44,10 @@ public interface CodeGeneratorCastRule<IN, OUT> extends CastRule<IN, OUT> { /** Context for code generation. */ interface Context { + /** @return where the legacy behaviour should be followed or not. */ + @Deprecated + boolean legacyBehaviour(); + /** @return the session time zone term */ String getSessionTimeZoneTerm(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java index 62c8e53..b3be464 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java @@ -45,11 +45,13 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver this( dataType, DateTimeUtils.UTC_ZONE.toZoneId(), - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), + true); } @SuppressWarnings("unchecked") - public RowDataToStringConverterImpl(DataType dataType, ZoneId zoneId, ClassLoader classLoader) { + public RowDataToStringConverterImpl( + DataType dataType, ZoneId zoneId, ClassLoader classLoader, boolean legacyBehaviour) { List<DataType> rowDataTypes = DataType.getFieldDataTypes(dataType); this.columnConverters = new Function[rowDataTypes.size()]; @@ -60,7 +62,7 @@ public final class RowDataToStringConverterImpl implements RowDataToStringConver CastExecutor<Object, StringData> castExecutor = (CastExecutor<Object, StringData>) CastRuleProvider.create( - CastRule.Context.create(zoneId, classLoader), + CastRule.Context.create(legacyBehaviour, zoneId, classLoader), fieldType, STRING().getLogicalType()); if (castExecutor == null) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java index baa5119..911d262 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowToStringCastRule.java @@ -125,7 +125,8 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa // Write the comma if (fieldIndex != 0) { - writer.stmt(methodCall(builderTerm, "append", strLiteral(", "))); + final String comma = getDelimiter(context); + writer.stmt(methodCall(builderTerm, "append", comma)); } writer @@ -167,4 +168,14 @@ class RowToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<ArrayDa return writer.toString(); } + + private String getDelimiter(CodeGeneratorCastRule.Context context) { + final String comma; + if (context.legacyBehaviour()) { + comma = strLiteral(","); + } else { + comma = strLiteral(", "); + } + return comma; + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala index b21d097..7a72e25 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala @@ -54,6 +54,8 @@ object CodeGenUtils { // ------------------------------- DEFAULT TERMS ------------------------------------------ + val DEFAULT_LEGACY_CAST_BEHAVIOUR = "legacyCastBehaviour" + val DEFAULT_TIMEZONE_TERM = "timeZone" val DEFAULT_INPUT1_TERM = "in1" diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala index fd5da47..045976c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.codegen.calls import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.data.binary.BinaryArrayData import org.apache.flink.table.data.util.MapDataUtil import org.apache.flink.table.data.utils.CastExecutor @@ -2162,6 +2163,7 @@ object ScalarOperatorGens { def toCodegenCastContext(ctx: CodeGeneratorContext): CodeGeneratorCastRule.Context = { new CodeGeneratorCastRule.Context { + override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx) override def getSessionTimeZoneTerm: String = ctx.addReusableSessionTimeZone() override def declareVariable(ty: String, variablePrefix: String): String = ctx.addReusableLocalVariable(ty, variablePrefix) @@ -2176,10 +2178,16 @@ object ScalarOperatorGens { def toCastContext(ctx: CodeGeneratorContext): CastRule.Context = { new CastRule.Context { + override def legacyBehaviour(): Boolean = isLegacyCastBehaviourEnabled(ctx) + override def getSessionZoneId: ZoneId = ctx.tableConfig.getLocalTimeZone override def getClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader } } + private def isLegacyCastBehaviourEnabled(ctx: CodeGeneratorContext) = { + ctx.tableConfig + .getConfiguration.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR).isEnabled + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java index a0c66b5..6f74b45 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java @@ -95,7 +95,7 @@ class CastRulesTest { private static final ZoneId CET = ZoneId.of("CET"); private static final CastRule.Context CET_CONTEXT = - CastRule.Context.create(CET, Thread.currentThread().getContextClassLoader()); + CastRule.Context.create(false, CET, Thread.currentThread().getContextClassLoader()); private static final byte DEFAULT_POSITIVE_TINY_INT = (byte) 5; private static final byte DEFAULT_NEGATIVE_TINY_INT = (byte) -5; @@ -614,7 +614,14 @@ class CastRulesTest { ROW(FIELD("f0", STRING()), FIELD("f1", STRING())), GenericRowData.of( StringData.fromString("abc"), StringData.fromString("def")), - StringData.fromString("(abc, def)")) + StringData.fromString("(abc, def)"), + false) + .fromCase( + ROW(FIELD("f0", STRING()), FIELD("f1", STRING())), + GenericRowData.of( + StringData.fromString("abc"), StringData.fromString("def")), + StringData.fromString("(abc,def)"), + true) .fromCase( ROW(FIELD("f0", INT().nullable()), FIELD("f1", STRING())), GenericRowData.of(null, StringData.fromString("abc")), @@ -860,9 +867,15 @@ class CastRulesTest { } private CastTestSpecBuilder fromCase(DataType dataType, Object src, Object target) { + return fromCase(dataType, src, target, false); + } + + private CastTestSpecBuilder fromCase( + DataType dataType, Object src, Object target, boolean legacyBehaviour) { return fromCase( dataType, CastRule.Context.create( + legacyBehaviour, DateTimeUtils.UTC_ZONE.toZoneId(), Thread.currentThread().getContextClassLoader()), src, @@ -900,6 +913,7 @@ class CastRulesTest { return fail( dataType, CastRule.Context.create( + false, DateTimeUtils.UTC_ZONE.toZoneId(), Thread.currentThread().getContextClassLoader()), src, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index 5204b94..b62f158 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -33,6 +33,7 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl +import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableException, ValidationException} import org.apache.flink.table.data.RowData import org.apache.flink.table.data.binary.BinaryRowData @@ -50,6 +51,7 @@ import org.apache.flink.table.types.AbstractDataType import org.apache.flink.table.types.logical.{RowType, VarCharType} import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.types.Row + import org.junit.Assert.{assertEquals, assertTrue, fail} import org.junit.rules.ExpectedException import org.junit.{After, Before, Rule} @@ -98,6 +100,10 @@ abstract class ExpressionTestBase { @Before def prepare(): Unit = { + config.getConfiguration.set( + ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR, + ExecutionConfigOptions.LegacyCastBehaviour.DISABLED + ) if (containsLegacyTypes) { val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo) tEnv.createTemporaryView(tableName, ds, typeInfo.getFieldNames.map(api.$): _*)
