This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bfe6f19173f [FLINK-25809][table] Improve readability of 
TableTestPrograms
bfe6f19173f is described below

commit bfe6f19173fc89e552a04edcd24b508af6155d93
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Oct 31 12:13:22 2023 +0100

    [FLINK-25809][table] Improve readability of TableTestPrograms
---
 .../flink/table/test/program/SinkTestStep.java     |  60 ++++++
 .../flink/table/test/program/SourceTestStep.java   |  42 +++++
 .../flink/table/test/program/TableTestProgram.java | 208 ++-------------------
 .../flink/table/test/program/TableTestStep.java    |  61 ++++++
 .../nodes/exec/testutils/CalcTestPrograms.java     |  24 ++-
 .../test/program/TableTestProgramRunnerTest.java   |  21 ++-
 6 files changed, 207 insertions(+), 209 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
index d195d65c144..11ae1bb055d 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
@@ -22,6 +22,7 @@ import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -58,6 +59,11 @@ public final class SinkTestStep extends TableTestStep {
         this.expectedAfterRestoreStrings = expectedAfterRestoreStrings;
     }
 
+    /** Builder for creating a {@link SinkTestStep}. */
+    public static SinkTestStep.Builder newBuilder(String name) {
+        return new SinkTestStep.Builder(name);
+    }
+
     public List<String> getExpectedBeforeRestoreAsStrings() {
         if (expectedBeforeRestoreStrings != null) {
             return expectedBeforeRestoreStrings;
@@ -90,4 +96,58 @@ public final class SinkTestStep extends TableTestStep {
                         ? TestKind.SINK_WITH_DATA
                         : TestKind.SINK_WITH_RESTORE_DATA;
     }
+
+    /** Builder pattern for {@link SinkTestStep}. */
+    public static final class Builder extends AbstractBuilder<Builder> {
+
+        private List<Row> expectedBeforeRestore;
+        private List<Row> expectedAfterRestore;
+
+        private List<String> expectedBeforeRestoreStrings;
+        private List<String> expectedAfterRestoreStrings;
+
+        private Builder(String name) {
+            super(name);
+        }
+
+        public Builder consumedValues(Row... expectedRows) {
+            return consumedBeforeRestore(expectedRows);
+        }
+
+        public Builder consumedValues(String... expectedRows) {
+            return consumedBeforeRestore(expectedRows);
+        }
+
+        public Builder consumedBeforeRestore(Row... expectedRows) {
+            this.expectedBeforeRestore = Arrays.asList(expectedRows);
+            return this;
+        }
+
+        public Builder consumedBeforeRestore(String... expectedRows) {
+            this.expectedBeforeRestoreStrings = Arrays.asList(expectedRows);
+            return this;
+        }
+
+        public Builder consumedAfterRestore(Row... expectedRows) {
+            this.expectedAfterRestore = Arrays.asList(expectedRows);
+            return this;
+        }
+
+        public Builder consumedAfterRestore(String... expectedRows) {
+            this.expectedAfterRestoreStrings = Arrays.asList(expectedRows);
+            return this;
+        }
+
+        public SinkTestStep build() {
+            return new SinkTestStep(
+                    name,
+                    schemaComponents,
+                    partitionKeys,
+                    options,
+                    expectedBeforeRestore,
+                    expectedAfterRestore,
+                    expectedBeforeRestoreStrings,
+                    expectedAfterRestoreStrings);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
index eec3b1677b0..6653eb174c5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.test.program;
 
 import org.apache.flink.types.Row;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -41,6 +43,11 @@ public final class SourceTestStep extends TableTestStep {
         this.dataAfterRestore = dataAfterRestore;
     }
 
+    /** Builder for creating a {@link SourceTestStep}. */
+    public static Builder newBuilder(String name) {
+        return new Builder(name);
+    }
+
     @Override
     public TestKind getKind() {
         return dataBeforeRestore.isEmpty()
@@ -49,4 +56,39 @@ public final class SourceTestStep extends TableTestStep {
                         ? TestKind.SOURCE_WITH_DATA
                         : TestKind.SOURCE_WITH_RESTORE_DATA;
     }
+
+    /** Builder pattern for {@link SourceTestStep}. */
+    public static final class Builder extends AbstractBuilder<Builder> {
+
+        private final List<Row> dataBeforeRestore = new ArrayList<>();
+        private final List<Row> dataAfterRestore = new ArrayList<>();
+
+        private Builder(String name) {
+            super(name);
+        }
+
+        public Builder producedValues(Row... data) {
+            return producedBeforeRestore(data);
+        }
+
+        public Builder producedBeforeRestore(Row... data) {
+            this.dataBeforeRestore.addAll(Arrays.asList(data));
+            return this;
+        }
+
+        public Builder producedAfterRestore(Row... data) {
+            this.dataAfterRestore.addAll(Arrays.asList(data));
+            return this;
+        }
+
+        public SourceTestStep build() {
+            return new SourceTestStep(
+                    name,
+                    schemaComponents,
+                    partitionKeys,
+                    options,
+                    dataBeforeRestore,
+                    dataAfterRestore);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index efec060315d..731c967bba1 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -19,21 +19,16 @@
 package org.apache.flink.table.test.program;
 
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior;
 import 
org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence;
 import org.apache.flink.table.test.program.TestStep.TestKind;
-import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -250,205 +245,40 @@ public class TableTestProgram {
             return this;
         }
 
-        /** Setup step for building a table source. */
-        public SourceBuilder setupTableSource(String name) {
-            return new SourceBuilder(name, setupSteps, this);
-        }
-
-        /** Setup step for building a table sink. */
-        public SinkBuilder setupTableSink(String name) {
-            return new SinkBuilder(name, setupSteps, this);
-        }
-
-        /** Run step for executing SQL. */
-        public Builder runSql(String sql) {
-            this.runSteps.add(new SqlTestStep(sql));
-            return this;
-        }
-
-        /** Run step for executing a statement set. */
-        public StatementSetBuilder runStatementSet() {
-            return new StatementSetBuilder(runSteps, this);
-        }
-
-        public TableTestProgram build() {
-            return new TableTestProgram(id, description, setupSteps, runSteps);
-        }
-    }
-
-    /** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */
-    @SuppressWarnings("unchecked")
-    private static class TableBuilder<SpecificBuilder extends 
TableBuilder<SpecificBuilder>> {
-
-        protected final String name;
-        protected final List<TestStep> targetSteps;
-        protected final Builder rootBuilder;
-
-        protected final List<String> schemaComponents = new ArrayList<>();
-        protected final List<String> partitionKeys = new ArrayList<>();
-        protected final Map<String, String> options = new HashMap<>();
-
-        private TableBuilder(String name, List<TestStep> targetSteps, Builder 
rootBuilder) {
-            this.name = name;
-            this.targetSteps = targetSteps;
-            this.rootBuilder = rootBuilder;
-        }
-
-        /**
-         * Define the schema like you would in SQL e.g. "my_col INT", "PRIMARY 
KEY (uid) NOT
-         * ENFORCED", or "WATERMARK FOR ts AS ts".
-         */
-        public SpecificBuilder withSchema(String... schemaComponents) {
-            this.schemaComponents.addAll(Arrays.asList(schemaComponents));
-            return (SpecificBuilder) this;
-        }
-
         /**
-         * Unless the test requires a very specific configuration, try to 
avoid calling this method
-         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
-         */
-        public SpecificBuilder withOptions(Map<String, String> options) {
-            this.options.putAll(options);
-            return (SpecificBuilder) this;
-        }
-
-        /**
-         * Unless the test requires a very specific configuration, try to 
avoid calling this method
-         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         * Setup step for a table source.
+         *
+         * <p>Use {@link SourceTestStep.Builder} to construct this step.
          */
-        public SpecificBuilder withOption(String key, String value) {
-            this.options.put(key, value);
-            return (SpecificBuilder) this;
+        public Builder setupTableSource(SourceTestStep sourceTestStep) {
+            setupSteps.add(sourceTestStep);
+            return this;
         }
 
         /**
-         * Unless the test requires a very specific configuration, try to 
avoid calling this method
-         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         * Setup step for a table sink.
+         *
+         * <p>Use {@link SinkTestStep.Builder} to construct this step.
          */
-        public <T> SpecificBuilder withOption(ConfigOption<T> option, String 
value) {
-            this.options.put(option.key(), 
ConfigurationUtils.convertValue(value, String.class));
-            return (SpecificBuilder) this;
-        }
-
-        public SpecificBuilder withPartitionKeys(String... partitionKeys) {
-            this.partitionKeys.addAll(Arrays.asList(partitionKeys));
-            return (SpecificBuilder) this;
-        }
-    }
-
-    /** Builder pattern for {@link SourceTestStep}. */
-    public static class SourceBuilder extends TableBuilder<SourceBuilder> {
-
-        private final List<Row> dataBeforeRestore = new ArrayList<>();
-        private final List<Row> dataAfterRestore = new ArrayList<>();
-
-        private SourceBuilder(String name, List<TestStep> targetSteps, Builder 
rootBuilder) {
-            super(name, targetSteps, rootBuilder);
-        }
-
-        public SourceBuilder withValues(Row... data) {
-            return withValuesBeforeRestore(data);
-        }
-
-        public SourceBuilder withValuesBeforeRestore(Row... data) {
-            this.dataBeforeRestore.addAll(Arrays.asList(data));
-            return this;
-        }
-
-        public SourceBuilder withValuesAfterRestore(Row... data) {
-            this.dataAfterRestore.addAll(Arrays.asList(data));
-            return this;
-        }
-
-        public Builder complete() {
-            targetSteps.add(
-                    new SourceTestStep(
-                            name,
-                            schemaComponents,
-                            partitionKeys,
-                            options,
-                            dataBeforeRestore,
-                            dataAfterRestore));
-            return rootBuilder;
-        }
-    }
-
-    /** Builder pattern for {@link SinkTestStep}. */
-    public static class SinkBuilder extends TableBuilder<SinkBuilder> {
-
-        private List<Row> expectedBeforeRestore;
-        private List<Row> expectedAfterRestore;
-
-        private List<String> expectedBeforeRestoreStrings;
-        private List<String> expectedAfterRestoreStrings;
-
-        private SinkBuilder(String name, List<TestStep> targetSteps, Builder 
rootBuilder) {
-            super(name, targetSteps, rootBuilder);
-        }
-
-        public SinkBuilder withExpectedValues(Row... expectedRows) {
-            return withValuesBeforeRestore(expectedRows);
-        }
-
-        public SinkBuilder withExpectedValues(String... expectedRows) {
-            return withValuesBeforeRestore(expectedRows);
-        }
-
-        public SinkBuilder withValuesBeforeRestore(Row... expectedRows) {
-            this.expectedBeforeRestore = Arrays.asList(expectedRows);
-            return this;
-        }
-
-        public SinkBuilder withValuesBeforeRestore(String... expectedRows) {
-            this.expectedBeforeRestoreStrings = Arrays.asList(expectedRows);
+        public Builder setupTableSink(SinkTestStep sinkTestStep) {
+            setupSteps.add(sinkTestStep);
             return this;
         }
 
-        public SinkBuilder withValuesAfterRestore(Row... expectedRows) {
-            this.expectedAfterRestore = Arrays.asList(expectedRows);
-            return this;
-        }
-
-        public SinkBuilder withValuesAfterRestore(String... expectedRows) {
-            this.expectedAfterRestoreStrings = Arrays.asList(expectedRows);
+        /** Run step for executing SQL. */
+        public Builder runSql(String sql) {
+            this.runSteps.add(new SqlTestStep(sql));
             return this;
         }
 
-        public Builder complete() {
-            targetSteps.add(
-                    new SinkTestStep(
-                            name,
-                            schemaComponents,
-                            partitionKeys,
-                            options,
-                            expectedBeforeRestore,
-                            expectedAfterRestore,
-                            expectedBeforeRestoreStrings,
-                            expectedAfterRestoreStrings));
-            return rootBuilder;
-        }
-    }
-
-    /** Builder pattern for {@link StatementSetTestStep}. */
-    public static class StatementSetBuilder {
-
-        private final List<TestStep> targetSteps;
-        private final Builder rootBuilder;
-        private final List<String> statements = new ArrayList<>();
-
-        private StatementSetBuilder(List<TestStep> targetSteps, Builder 
rootBuilder) {
-            this.targetSteps = targetSteps;
-            this.rootBuilder = rootBuilder;
-        }
-
-        public StatementSetBuilder withSql(String sql) {
-            this.statements.add(sql);
+        /** Run step for executing a statement set. */
+        public Builder runStatementSet(String... sql) {
+            this.runSteps.add(new StatementSetTestStep(Arrays.asList(sql)));
             return this;
         }
 
-        public Builder complete() {
-            this.targetSteps.add(new StatementSetTestStep(statements));
-            return rootBuilder;
+        public TableTestProgram build() {
+            return new TableTestProgram(id, description, setupSteps, runSteps);
         }
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
index bff700d9a15..1d0207f7126 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.table.test.program;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -70,4 +74,61 @@ public abstract class TableTestStep implements TestStep {
 
         return env.executeSql(createTable);
     }
+
+    /** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */
+    @SuppressWarnings("unchecked")
+    protected abstract static class AbstractBuilder<
+            SpecificBuilder extends AbstractBuilder<SpecificBuilder>> {
+
+        protected final String name;
+
+        protected final List<String> schemaComponents = new ArrayList<>();
+        protected final List<String> partitionKeys = new ArrayList<>();
+        protected final Map<String, String> options = new HashMap<>();
+
+        protected AbstractBuilder(String name) {
+            this.name = name;
+        }
+
+        /**
+         * Define the schema like you would in SQL e.g. "my_col INT", "PRIMARY 
KEY (uid) NOT
+         * ENFORCED", or "WATERMARK FOR ts AS ts".
+         */
+        public SpecificBuilder addSchema(String... schemaComponents) {
+            this.schemaComponents.addAll(Arrays.asList(schemaComponents));
+            return (SpecificBuilder) this;
+        }
+
+        /**
+         * Unless the test requires a very specific configuration, try to 
avoid calling this method
+         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         */
+        public SpecificBuilder addOptions(Map<String, String> options) {
+            this.options.putAll(options);
+            return (SpecificBuilder) this;
+        }
+
+        /**
+         * Unless the test requires a very specific configuration, try to 
avoid calling this method
+         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         */
+        public SpecificBuilder addOption(String key, String value) {
+            this.options.put(key, value);
+            return (SpecificBuilder) this;
+        }
+
+        /**
+         * Unless the test requires a very specific configuration, try to 
avoid calling this method
+         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         */
+        public <T> SpecificBuilder addOption(ConfigOption<T> option, String 
value) {
+            this.options.put(option.key(), 
ConfigurationUtils.convertValue(value, String.class));
+            return (SpecificBuilder) this;
+        }
+
+        public SpecificBuilder addPartitionKeys(String... partitionKeys) {
+            this.partitionKeys.addAll(Arrays.asList(partitionKeys));
+            return (SpecificBuilder) this;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
index 534d50a26bb..feb151ba9ef 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.planner.plan.nodes.exec.testutils;
 
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 
@@ -27,16 +29,18 @@ public class CalcTestPrograms {
 
     static final TableTestProgram SIMPLE_CALC =
             TableTestProgram.of("calc-simple", "validates basic calc node")
-                    .setupTableSource("t")
-                    .withSchema("a BIGINT", "b DOUBLE")
-                    .withValuesBeforeRestore(Row.of(420L, 42.0))
-                    .withValuesAfterRestore(Row.of(421L, 42.1))
-                    .complete()
-                    .setupTableSink("sink_t")
-                    .withSchema("a BIGINT", "b DOUBLE")
-                    .withValuesBeforeRestore(Row.of(421L, 42.0))
-                    .withValuesAfterRestore(Row.of(422L, 42.1))
-                    .complete()
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("a BIGINT", "b DOUBLE")
+                                    .producedBeforeRestore(Row.of(420L, 42.0))
+                                    .producedAfterRestore(Row.of(421L, 42.1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a BIGINT", "b DOUBLE")
+                                    .consumedBeforeRestore(Row.of(421L, 42.0))
+                                    .consumedAfterRestore(Row.of(422L, 42.1))
+                                    .build())
                     .runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
                     .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
index e7b58aaa864..a0d114c1178 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
@@ -97,14 +97,16 @@ public class TableTestProgramRunnerTest {
     void testTableStep() {
         final TableTestProgram program =
                 TableTestProgram.of(ID, DESCRIPTION)
-                        .setupTableSource("MyTableSource")
-                        .withSchema("i INT")
-                        .withOption("connector", "datagen")
-                        .complete()
-                        .setupTableSource("MyTableSink")
-                        .withSchema("i INT")
-                        .withOption("connector", "blackhole")
-                        .complete()
+                        .setupTableSource(
+                                SourceTestStep.newBuilder("MyTableSource")
+                                        .addSchema("i INT")
+                                        .addOption("connector", "datagen")
+                                        .build())
+                        .setupTableSink(
+                                SinkTestStep.newBuilder("MyTableSink")
+                                        .addSchema("i INT")
+                                        .addOption("connector", "blackhole")
+                                        .build())
                         .build();
 
         assertThat(program.setupSteps).hasSize(2);
@@ -127,8 +129,7 @@ public class TableTestProgramRunnerTest {
                         "CREATE TABLE 
`default_catalog`.`default_database`.`MyTableSink` (\n"
                                 + "  `i` INT\n"
                                 + ") WITH (\n"
-                                + "  'connector' = 'blackhole',\n"
-                                + "  'number-of-rows' = '3'\n"
+                                + "  'connector' = 'blackhole'\n"
                                 + ")\n");
     }
 

Reply via email to