matriv commented on a change in pull request #17811:
URL: https://github.com/apache/flink/pull/17811#discussion_r756727589
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
##########
@@ -187,6 +198,148 @@ public void testStreamRecordTimestampInserterNotApplied()
{
assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
}
+ @Test
+ public void testCharPrecisionEnforcer() throws ExecutionException,
InterruptedException {
+ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"),
+ Row.of(2, "Apache", "SQL", 22, 222, "Flink"),
+ Row.of(3, "Apache", "Flink SQL", 33, 333, "Apache
Flink SQL"),
+ Row.of(4, "Flink Project", "SQL or SeQueL?", 44, 444,
"Apache Flink SQL"));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(schemaForCharPrecisionEnforcer())
+ .source(new TestSource(rows))
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+
+ // Default config - ignore (no trim)
+ TableResult result = tableEnv.executeSql("SELECT * FROM T1");
+ result.await();
+
+ final List<String> expected =
rows.stream().map(Row::toString).collect(Collectors.toList());
+ final List<String> resultsAsString = new ArrayList<>();
+ result.collect().forEachRemaining(r ->
resultsAsString.add(r.toString()));
+ assertEquals(expected, resultsAsString);
+
+ try {
+ tableEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+
ExecutionConfigOptions.CharPrecisionEnforcer.TRIM.name());
+
+ result = tableEnv.executeSql("SELECT * FROM T1");
+ result.await();
+
+ final List<String> expectedTrimmed =
+ Arrays.asList(
+ "+I[1, Apache F, SQL R, 11, 111, SQL]",
+ "+I[2, Apache, SQL, 22, 222, Flink]",
+ "+I[3, Apache, Flink, 33, 333, Apache]",
+ "+I[4, Flink Pr, SQL o, 44, 444, Apache]");
+ final List<String> resultsAsStringStrimmed = new ArrayList<>();
+ result.collect().forEachRemaining(r ->
resultsAsStringStrimmed.add(r.toString()));
+ assertEquals(expectedTrimmed, resultsAsStringStrimmed);
+
+ } finally {
+ tableEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
+
ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+ }
+ }
+
+ @Test
+ public void testNullEnforcer() throws ExecutionException,
InterruptedException {
+ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "Apache", 11),
+ Row.of(2, null, 22),
+ Row.of(null, "Flink", 33),
+ Row.of(null, null, 44));
+
+ final SharedReference<List<RowData>> results = sharedObjects.add(new
ArrayList<>());
+ tableEnv.createTable(
+ "T1",
+ TableFactoryHarness.newBuilder()
+ .schema(schemaForNotNullEnforcer())
+ .source(new TestSource(rows))
+ .sink(
+ new TableFactoryHarness.SinkBase() {
+ @Override
+ public DynamicTableSink.SinkRuntimeProvider
+ getSinkRuntimeProvider(
+ DynamicTableSink.Context
context) {
+ return SinkProvider.of(
+ TestSink.newBuilder()
+ .setWriter(new
TestNotNullWriter(results))
+
.setCommittableSerializer(
+
TestSink.StringCommittableSerializer
+
.INSTANCE)
+ .build());
+ }
+ })
+ .build());
+
+ // Default config - ignore (no trim)
+ ExecutionException ee =
Review comment:
indeed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]