This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bfe6f19173f [FLINK-25809][table] Improve readability of TableTestPrograms bfe6f19173f is described below commit bfe6f19173fc89e552a04edcd24b508af6155d93 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Oct 31 12:13:22 2023 +0100 [FLINK-25809][table] Improve readability of TableTestPrograms --- .../flink/table/test/program/SinkTestStep.java | 60 ++++++ .../flink/table/test/program/SourceTestStep.java | 42 +++++ .../flink/table/test/program/TableTestProgram.java | 208 ++------------------- .../flink/table/test/program/TableTestStep.java | 61 ++++++ .../nodes/exec/testutils/CalcTestPrograms.java | 24 ++- .../test/program/TableTestProgramRunnerTest.java | 21 ++- 6 files changed, 207 insertions(+), 209 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java index d195d65c144..11ae1bb055d 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java @@ -22,6 +22,7 @@ import org.apache.flink.types.Row; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -58,6 +59,11 @@ public final class SinkTestStep extends TableTestStep { this.expectedAfterRestoreStrings = expectedAfterRestoreStrings; } + /** Builder for creating a {@link SinkTestStep}. */ + public static SinkTestStep.Builder newBuilder(String name) { + return new SinkTestStep.Builder(name); + } + public List<String> getExpectedBeforeRestoreAsStrings() { if (expectedBeforeRestoreStrings != null) { return expectedBeforeRestoreStrings; @@ -90,4 +96,58 @@ public final class SinkTestStep extends TableTestStep { ? TestKind.SINK_WITH_DATA : TestKind.SINK_WITH_RESTORE_DATA; } + + /** Builder pattern for {@link SinkTestStep}. */ + public static final class Builder extends AbstractBuilder<Builder> { + + private List<Row> expectedBeforeRestore; + private List<Row> expectedAfterRestore; + + private List<String> expectedBeforeRestoreStrings; + private List<String> expectedAfterRestoreStrings; + + private Builder(String name) { + super(name); + } + + public Builder consumedValues(Row... expectedRows) { + return consumedBeforeRestore(expectedRows); + } + + public Builder consumedValues(String... expectedRows) { + return consumedBeforeRestore(expectedRows); + } + + public Builder consumedBeforeRestore(Row... expectedRows) { + this.expectedBeforeRestore = Arrays.asList(expectedRows); + return this; + } + + public Builder consumedBeforeRestore(String... expectedRows) { + this.expectedBeforeRestoreStrings = Arrays.asList(expectedRows); + return this; + } + + public Builder consumedAfterRestore(Row... expectedRows) { + this.expectedAfterRestore = Arrays.asList(expectedRows); + return this; + } + + public Builder consumedAfterRestore(String... expectedRows) { + this.expectedAfterRestoreStrings = Arrays.asList(expectedRows); + return this; + } + + public SinkTestStep build() { + return new SinkTestStep( + name, + schemaComponents, + partitionKeys, + options, + expectedBeforeRestore, + expectedAfterRestore, + expectedBeforeRestoreStrings, + expectedAfterRestoreStrings); + } + } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java index eec3b1677b0..6653eb174c5 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java @@ -20,6 +20,8 @@ package org.apache.flink.table.test.program; import org.apache.flink.types.Row; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -41,6 +43,11 @@ public final class SourceTestStep extends TableTestStep { this.dataAfterRestore = dataAfterRestore; } + /** Builder for creating a {@link SourceTestStep}. */ + public static Builder newBuilder(String name) { + return new Builder(name); + } + @Override public TestKind getKind() { return dataBeforeRestore.isEmpty() @@ -49,4 +56,39 @@ public final class SourceTestStep extends TableTestStep { ? TestKind.SOURCE_WITH_DATA : TestKind.SOURCE_WITH_RESTORE_DATA; } + + /** Builder pattern for {@link SourceTestStep}. */ + public static final class Builder extends AbstractBuilder<Builder> { + + private final List<Row> dataBeforeRestore = new ArrayList<>(); + private final List<Row> dataAfterRestore = new ArrayList<>(); + + private Builder(String name) { + super(name); + } + + public Builder producedValues(Row... data) { + return producedBeforeRestore(data); + } + + public Builder producedBeforeRestore(Row... data) { + this.dataBeforeRestore.addAll(Arrays.asList(data)); + return this; + } + + public Builder producedAfterRestore(Row... data) { + this.dataAfterRestore.addAll(Arrays.asList(data)); + return this; + } + + public SourceTestStep build() { + return new SourceTestStep( + name, + schemaComponents, + partitionKeys, + options, + dataBeforeRestore, + dataAfterRestore); + } + } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java index efec060315d..731c967bba1 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java @@ -19,21 +19,16 @@ package org.apache.flink.table.test.program; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior; import org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence; import org.apache.flink.table.test.program.TestStep.TestKind; -import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; /** @@ -250,205 +245,40 @@ public class TableTestProgram { return this; } - /** Setup step for building a table source. */ - public SourceBuilder setupTableSource(String name) { - return new SourceBuilder(name, setupSteps, this); - } - - /** Setup step for building a table sink. */ - public SinkBuilder setupTableSink(String name) { - return new SinkBuilder(name, setupSteps, this); - } - - /** Run step for executing SQL. */ - public Builder runSql(String sql) { - this.runSteps.add(new SqlTestStep(sql)); - return this; - } - - /** Run step for executing a statement set. */ - public StatementSetBuilder runStatementSet() { - return new StatementSetBuilder(runSteps, this); - } - - public TableTestProgram build() { - return new TableTestProgram(id, description, setupSteps, runSteps); - } - } - - /** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */ - @SuppressWarnings("unchecked") - private static class TableBuilder<SpecificBuilder extends TableBuilder<SpecificBuilder>> { - - protected final String name; - protected final List<TestStep> targetSteps; - protected final Builder rootBuilder; - - protected final List<String> schemaComponents = new ArrayList<>(); - protected final List<String> partitionKeys = new ArrayList<>(); - protected final Map<String, String> options = new HashMap<>(); - - private TableBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) { - this.name = name; - this.targetSteps = targetSteps; - this.rootBuilder = rootBuilder; - } - - /** - * Define the schema like you would in SQL e.g. "my_col INT", "PRIMARY KEY (uid) NOT - * ENFORCED", or "WATERMARK FOR ts AS ts". - */ - public SpecificBuilder withSchema(String... schemaComponents) { - this.schemaComponents.addAll(Arrays.asList(schemaComponents)); - return (SpecificBuilder) this; - } - /** - * Unless the test requires a very specific configuration, try to avoid calling this method - * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. - */ - public SpecificBuilder withOptions(Map<String, String> options) { - this.options.putAll(options); - return (SpecificBuilder) this; - } - - /** - * Unless the test requires a very specific configuration, try to avoid calling this method - * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + * Setup step for a table source. + * + * <p>Use {@link SourceTestStep.Builder} to construct this step. */ - public SpecificBuilder withOption(String key, String value) { - this.options.put(key, value); - return (SpecificBuilder) this; + public Builder setupTableSource(SourceTestStep sourceTestStep) { + setupSteps.add(sourceTestStep); + return this; } /** - * Unless the test requires a very specific configuration, try to avoid calling this method - * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + * Setup step for a table sink. + * + * <p>Use {@link SinkTestStep.Builder} to construct this step. */ - public <T> SpecificBuilder withOption(ConfigOption<T> option, String value) { - this.options.put(option.key(), ConfigurationUtils.convertValue(value, String.class)); - return (SpecificBuilder) this; - } - - public SpecificBuilder withPartitionKeys(String... partitionKeys) { - this.partitionKeys.addAll(Arrays.asList(partitionKeys)); - return (SpecificBuilder) this; - } - } - - /** Builder pattern for {@link SourceTestStep}. */ - public static class SourceBuilder extends TableBuilder<SourceBuilder> { - - private final List<Row> dataBeforeRestore = new ArrayList<>(); - private final List<Row> dataAfterRestore = new ArrayList<>(); - - private SourceBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) { - super(name, targetSteps, rootBuilder); - } - - public SourceBuilder withValues(Row... data) { - return withValuesBeforeRestore(data); - } - - public SourceBuilder withValuesBeforeRestore(Row... data) { - this.dataBeforeRestore.addAll(Arrays.asList(data)); - return this; - } - - public SourceBuilder withValuesAfterRestore(Row... data) { - this.dataAfterRestore.addAll(Arrays.asList(data)); - return this; - } - - public Builder complete() { - targetSteps.add( - new SourceTestStep( - name, - schemaComponents, - partitionKeys, - options, - dataBeforeRestore, - dataAfterRestore)); - return rootBuilder; - } - } - - /** Builder pattern for {@link SinkTestStep}. */ - public static class SinkBuilder extends TableBuilder<SinkBuilder> { - - private List<Row> expectedBeforeRestore; - private List<Row> expectedAfterRestore; - - private List<String> expectedBeforeRestoreStrings; - private List<String> expectedAfterRestoreStrings; - - private SinkBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) { - super(name, targetSteps, rootBuilder); - } - - public SinkBuilder withExpectedValues(Row... expectedRows) { - return withValuesBeforeRestore(expectedRows); - } - - public SinkBuilder withExpectedValues(String... expectedRows) { - return withValuesBeforeRestore(expectedRows); - } - - public SinkBuilder withValuesBeforeRestore(Row... expectedRows) { - this.expectedBeforeRestore = Arrays.asList(expectedRows); - return this; - } - - public SinkBuilder withValuesBeforeRestore(String... expectedRows) { - this.expectedBeforeRestoreStrings = Arrays.asList(expectedRows); + public Builder setupTableSink(SinkTestStep sinkTestStep) { + setupSteps.add(sinkTestStep); return this; } - public SinkBuilder withValuesAfterRestore(Row... expectedRows) { - this.expectedAfterRestore = Arrays.asList(expectedRows); - return this; - } - - public SinkBuilder withValuesAfterRestore(String... expectedRows) { - this.expectedAfterRestoreStrings = Arrays.asList(expectedRows); + /** Run step for executing SQL. */ + public Builder runSql(String sql) { + this.runSteps.add(new SqlTestStep(sql)); return this; } - public Builder complete() { - targetSteps.add( - new SinkTestStep( - name, - schemaComponents, - partitionKeys, - options, - expectedBeforeRestore, - expectedAfterRestore, - expectedBeforeRestoreStrings, - expectedAfterRestoreStrings)); - return rootBuilder; - } - } - - /** Builder pattern for {@link StatementSetTestStep}. */ - public static class StatementSetBuilder { - - private final List<TestStep> targetSteps; - private final Builder rootBuilder; - private final List<String> statements = new ArrayList<>(); - - private StatementSetBuilder(List<TestStep> targetSteps, Builder rootBuilder) { - this.targetSteps = targetSteps; - this.rootBuilder = rootBuilder; - } - - public StatementSetBuilder withSql(String sql) { - this.statements.add(sql); + /** Run step for executing a statement set. */ + public Builder runStatementSet(String... sql) { + this.runSteps.add(new StatementSetTestStep(Arrays.asList(sql))); return this; } - public Builder complete() { - this.targetSteps.add(new StatementSetTestStep(statements)); - return rootBuilder; + public TableTestProgram build() { + return new TableTestProgram(id, description, setupSteps, runSteps); } } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java index bff700d9a15..1d0207f7126 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java @@ -18,9 +18,13 @@ package org.apache.flink.table.test.program; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -70,4 +74,61 @@ public abstract class TableTestStep implements TestStep { return env.executeSql(createTable); } + + /** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */ + @SuppressWarnings("unchecked") + protected abstract static class AbstractBuilder< + SpecificBuilder extends AbstractBuilder<SpecificBuilder>> { + + protected final String name; + + protected final List<String> schemaComponents = new ArrayList<>(); + protected final List<String> partitionKeys = new ArrayList<>(); + protected final Map<String, String> options = new HashMap<>(); + + protected AbstractBuilder(String name) { + this.name = name; + } + + /** + * Define the schema like you would in SQL e.g. "my_col INT", "PRIMARY KEY (uid) NOT + * ENFORCED", or "WATERMARK FOR ts AS ts". + */ + public SpecificBuilder addSchema(String... schemaComponents) { + this.schemaComponents.addAll(Arrays.asList(schemaComponents)); + return (SpecificBuilder) this; + } + + /** + * Unless the test requires a very specific configuration, try to avoid calling this method + * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + */ + public SpecificBuilder addOptions(Map<String, String> options) { + this.options.putAll(options); + return (SpecificBuilder) this; + } + + /** + * Unless the test requires a very specific configuration, try to avoid calling this method + * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + */ + public SpecificBuilder addOption(String key, String value) { + this.options.put(key, value); + return (SpecificBuilder) this; + } + + /** + * Unless the test requires a very specific configuration, try to avoid calling this method + * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + */ + public <T> SpecificBuilder addOption(ConfigOption<T> option, String value) { + this.options.put(option.key(), ConfigurationUtils.convertValue(value, String.class)); + return (SpecificBuilder) this; + } + + public SpecificBuilder addPartitionKeys(String... partitionKeys) { + this.partitionKeys.addAll(Arrays.asList(partitionKeys)); + return (SpecificBuilder) this; + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java index 534d50a26bb..feb151ba9ef 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java @@ -19,6 +19,8 @@ package org.apache.flink.table.planner.plan.nodes.exec.testutils; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; @@ -27,16 +29,18 @@ public class CalcTestPrograms { static final TableTestProgram SIMPLE_CALC = TableTestProgram.of("calc-simple", "validates basic calc node") - .setupTableSource("t") - .withSchema("a BIGINT", "b DOUBLE") - .withValuesBeforeRestore(Row.of(420L, 42.0)) - .withValuesAfterRestore(Row.of(421L, 42.1)) - .complete() - .setupTableSink("sink_t") - .withSchema("a BIGINT", "b DOUBLE") - .withValuesBeforeRestore(Row.of(421L, 42.0)) - .withValuesAfterRestore(Row.of(422L, 42.1)) - .complete() + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema("a BIGINT", "b DOUBLE") + .producedBeforeRestore(Row.of(420L, 42.0)) + .producedAfterRestore(Row.of(421L, 42.1)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a BIGINT", "b DOUBLE") + .consumedBeforeRestore(Row.of(421L, 42.0)) + .consumedAfterRestore(Row.of(422L, 42.1)) + .build()) .runSql("INSERT INTO sink_t SELECT a + 1, b FROM t") .build(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java index e7b58aaa864..a0d114c1178 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java @@ -97,14 +97,16 @@ public class TableTestProgramRunnerTest { void testTableStep() { final TableTestProgram program = TableTestProgram.of(ID, DESCRIPTION) - .setupTableSource("MyTableSource") - .withSchema("i INT") - .withOption("connector", "datagen") - .complete() - .setupTableSource("MyTableSink") - .withSchema("i INT") - .withOption("connector", "blackhole") - .complete() + .setupTableSource( + SourceTestStep.newBuilder("MyTableSource") + .addSchema("i INT") + .addOption("connector", "datagen") + .build()) + .setupTableSink( + SinkTestStep.newBuilder("MyTableSink") + .addSchema("i INT") + .addOption("connector", "blackhole") + .build()) .build(); assertThat(program.setupSteps).hasSize(2); @@ -127,8 +129,7 @@ public class TableTestProgramRunnerTest { "CREATE TABLE `default_catalog`.`default_database`.`MyTableSink` (\n" + " `i` INT\n" + ") WITH (\n" - + " 'connector' = 'blackhole',\n" - + " 'number-of-rows' = '3'\n" + + " 'connector' = 'blackhole'\n" + ")\n"); }