This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 347e4ca6c26 [FLINK-25809][table-api-java] Add table test program 
infrastructure
347e4ca6c26 is described below

commit 347e4ca6c265334a35969d1c8358ff5a9f066e92
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Oct 25 16:17:27 2023 +0200

    [FLINK-25809][table-api-java] Add table test program infrastructure
---
 .../table/test/program/ConfigOptionTestStep.java   |  43 ++
 .../flink/table/test/program/FunctionTestStep.java |  75 ++++
 .../flink/table/test/program/SinkTestStep.java     |  55 +++
 .../flink/table/test/program/SourceTestStep.java   |  52 +++
 .../flink/table/test/program/SqlTestStep.java      |  46 ++
 .../table/test/program/StatementSetTestStep.java   |  46 ++
 .../flink/table/test/program/TableTestProgram.java | 472 +++++++++++++++++++++
 .../table/test/program/TableTestProgramRunner.java | 112 +++++
 .../flink/table/test/program/TableTestStep.java    |  73 ++++
 .../apache/flink/table/test/program/TestStep.java  |  54 +++
 .../test/program/TableTestProgramRunnerTest.java   | 182 ++++++++
 11 files changed, 1210 insertions(+)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java
new file mode 100644
index 00000000000..7f66122f1cd
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.TableEnvironment;
+
+/** Test step for setting a {@link ConfigOption}. */
+public final class ConfigOptionTestStep<T> implements TestStep {
+
+    public final ConfigOption<T> option;
+    public final T value;
+
+    ConfigOptionTestStep(ConfigOption<T> option, T value) {
+        this.option = option;
+        this.value = value;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return TestKind.CONFIG;
+    }
+
+    public void apply(TableEnvironment env) {
+        env.getConfig().set(option, value);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java
new file mode 100644
index 00000000000..ad377bae4cd
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.functions.UserDefinedFunction;
+
+/** Test step for registering a (temporary) (system or catalog) function. */
+public final class FunctionTestStep implements TestStep {
+
+    /** Whether function should be temporary or not. */
+    enum FunctionPersistence {
+        TEMPORARY,
+        PERSISTENT
+    }
+
+    /** Whether function should be persisted in a catalog or not. */
+    enum FunctionBehavior {
+        SYSTEM,
+        CATALOG
+    }
+
+    public final FunctionPersistence persistence;
+    public final FunctionBehavior behavior;
+    public final String name;
+    public final Class<? extends UserDefinedFunction> function;
+
+    FunctionTestStep(
+            FunctionPersistence persistence,
+            FunctionBehavior behavior,
+            String name,
+            Class<? extends UserDefinedFunction> function) {
+        this.persistence = persistence;
+        this.behavior = behavior;
+        this.name = name;
+        this.function = function;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return TestKind.FUNCTION;
+    }
+
+    public void apply(TableEnvironment env) {
+        if (behavior == FunctionBehavior.SYSTEM) {
+            if (persistence == FunctionPersistence.TEMPORARY) {
+                env.createTemporarySystemFunction(name, function);
+            } else {
+                throw new UnsupportedOperationException("System functions must 
be temporary.");
+            }
+        } else {
+            if (persistence == FunctionPersistence.TEMPORARY) {
+                env.createTemporaryFunction(name, function);
+            } else {
+                env.createFunction(name, function);
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
new file mode 100644
index 00000000000..42bfbb9da87
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/** Test step for creating a table sink. */
+public final class SinkTestStep extends TableTestStep {
+
+    public final @Nullable Predicate<List<Row>> expectedBeforeRestore;
+    public final @Nullable Predicate<List<Row>> expectedAfterRestore;
+
+    SinkTestStep(
+            String name,
+            List<String> schemaComponents,
+            List<String> partitionKeys,
+            Map<String, String> options,
+            @Nullable Predicate<List<Row>> expectedBeforeRestore,
+            @Nullable Predicate<List<Row>> expectedAfterRestore) {
+        super(name, schemaComponents, partitionKeys, options);
+        this.expectedBeforeRestore = expectedBeforeRestore;
+        this.expectedAfterRestore = expectedAfterRestore;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return expectedBeforeRestore == null
+                ? TestKind.SINK_WITHOUT_DATA
+                : expectedAfterRestore == null
+                        ? TestKind.SINK_WITH_DATA
+                        : TestKind.SINK_WITH_RESTORE_DATA;
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
new file mode 100644
index 00000000000..eec3b1677b0
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+
+/** Test step for creating a table source. */
+public final class SourceTestStep extends TableTestStep {
+
+    public final List<Row> dataBeforeRestore;
+    public final List<Row> dataAfterRestore;
+
+    SourceTestStep(
+            String name,
+            List<String> schemaComponents,
+            List<String> partitionKeys,
+            Map<String, String> options,
+            List<Row> dataBeforeRestore,
+            List<Row> dataAfterRestore) {
+        super(name, schemaComponents, partitionKeys, options);
+        this.dataBeforeRestore = dataBeforeRestore;
+        this.dataAfterRestore = dataAfterRestore;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return dataBeforeRestore.isEmpty()
+                ? TestKind.SOURCE_WITHOUT_DATA
+                : dataAfterRestore.isEmpty()
+                        ? TestKind.SOURCE_WITH_DATA
+                        : TestKind.SOURCE_WITH_RESTORE_DATA;
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java
new file mode 100644
index 00000000000..c6809dc16cf
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+
+/**
+ * Test step for execution SQL.
+ *
+ * <p>Note: Not every runner supports generic SQL statements. Sometimes the 
runner would like to
+ * enrich properties e.g. of a CREATE TABLE. Use this step with caution.
+ */
+public final class SqlTestStep implements TestStep {
+
+    public final String sql;
+
+    SqlTestStep(String sql) {
+        this.sql = sql;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return TestKind.SQL;
+    }
+
+    public TableResult apply(TableEnvironment env) {
+        return env.executeSql(sql);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java
new file mode 100644
index 00000000000..98cf44ecc5a
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+
+import java.util.List;
+
+/** Test step for creating a statement set. */
+public final class StatementSetTestStep implements TestStep {
+
+    public final List<String> statements;
+
+    StatementSetTestStep(List<String> statements) {
+        this.statements = statements;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return TestKind.STATEMENT_SET;
+    }
+
+    public TableResult apply(TableEnvironment env) {
+        final StatementSet statementSet = env.createStatementSet();
+        statements.forEach(statementSet::addInsertSql);
+        return statementSet.execute();
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
new file mode 100644
index 00000000000..1b194b39ddd
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior;
+import 
org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * A generic declaration of a table program for testing.
+ *
+ * <p>A test program defines the basic test pipeline (from source to sink) and 
required artifacts
+ * such as table sources and sinks, configuration options, and user-defined 
functions. Because some
+ * programs need to create artifacts in a certain order, a program consists of 
individual {@link
+ * TestStep}s for setting up the test and the actual running of the test.
+ *
+ * <p>Tests programs are intended to reduce code duplication and test the same 
SQL statement though
+ * different layers of the stack. Different {@link TableTestProgramRunner}s 
can share the same
+ * program and enrich it with custom implementation and assertions.
+ *
+ * <p>For example, a SQL query such as {@code SELECT * FROM (VALUES (1), (2), 
(3))} can be declared
+ * once and can be shared among different tests for integration testing, 
optimizer plan testing,
+ * compiled plan testing, transformation testing, and others.
+ *
+ * <p>A typical implementation looks like:
+ *
+ * <pre>{@code
+ * // Define the behavior and configuration of an operation.
+ * public class CalcTestPrograms {
+ *     public static final TableTestProgram CALC_SIMPLE = 
TableTestProgram.of("calc-simple") ...;
+ *     public static final TableTestProgram CALC_COMPLEX = 
TableTestProgram.of("calc-complex") ...;
+ * }
+ *
+ * // Define a test base for example for plan testing
+ * public abstract class PlanTestBase implements TableTestProgramRunner {
+ *     // The test base declares what kind of steps it can apply.
+ *     public Set<TestStep.Kind> supportedSetupSteps() { return 
EnumSet.of(SOURCE_WITH_DATA, SINK_WITH_DATA); }
+ *     public Set<TestStep.Kind> supportedRunSteps() { return EnumSet.of(SQL); 
}
+ *
+ *     // Leave the list of programs up to the concrete test
+ *     public abstract List<TableTestProgram> programs();
+ *
+ *     @ParameterizedTest
+ *     @MethodSource("supportedPrograms")
+ *     public void test(TableTestProgram program) {
+ *         TableEnvironment env = ...;
+ *         program.getSetupSourceTestSteps().forEach(s -> s.apply(env));
+ *         program.getSetupSinkTestSteps().forEach(s -> s.apply(env));
+ *         assertThat(program.getRunSqlTestStep().apply(env)).contains(...);
+ *     }
+ * }
+ *
+ * // Run the test base for a category of test programs.
+ * public class CalcPlanTest extends PlanTestBase {
+ *     public List<TableTestProgram> programs() = { return 
Arrays.asList(CALC_SIMPLE, CALC_COMPLEX); }
+ * }
+ * }</pre>
+ */
+public class TableTestProgram {
+
+    /** Identifier of the test program (e.g. for naming generated files). */
+    public final String id;
+
+    /** Description for internal documentation. */
+    public final String description;
+
+    /** Steps to be executed for setting up an environment. */
+    public final List<TestStep> setupSteps;
+
+    /** Steps to be executed for running the actual test. */
+    public final List<TestStep> runSteps;
+
+    private TableTestProgram(
+            String id, String description, List<TestStep> setupSteps, 
List<TestStep> runSteps) {
+        this.id = id;
+        this.description = description;
+        this.setupSteps = setupSteps;
+        this.runSteps = runSteps;
+    }
+
+    /**
+     * Entrypoint for a {@link TableTestProgram} that forces an identifier and 
description of the
+     * test program.
+     *
+     * <p>The identifier is necessary to (ideally globally) identify the test 
program in outputs.
+     * For example, a runner for plan tests can create directories and use the 
name as file names.
+     *
+     * <p>The description should give more context and should start with a 
verb and "s" suffix.
+     *
+     * <p>For example:
+     *
+     * <ul>
+     *   <li>TableTestProgram.of("join-outer", "tests outer joins")
+     *   <li>TableTestProgram.of("rank-x-enabled", "validates a rank with 
config flag 'x' set")
+     *   <li>TableTestProgram.of("calc-with-projection", "verifies FLINK-12345 
is fixed due to
+     *       missing row projection")
+     * </ul>
+     */
+    public static Builder of(String id, String description) {
+        return new Builder(id, description);
+    }
+
+    /** Convenience method to avoid casting. It assumes that the order of 
steps is not important. */
+    public List<SourceTestStep> getSetupSourceTestSteps() {
+        final EnumSet<TestKind> sourceKinds =
+                EnumSet.of(
+                        TestKind.SOURCE_WITHOUT_DATA,
+                        TestKind.SOURCE_WITH_DATA,
+                        TestKind.SOURCE_WITH_RESTORE_DATA);
+        return setupSteps.stream()
+                .filter(s -> sourceKinds.contains(s.getKind()))
+                .map(SourceTestStep.class::cast)
+                .collect(Collectors.toList());
+    }
+
+    /** Convenience method to avoid casting. It assumes that the order of 
steps is not important. */
+    public List<SinkTestStep> getSetupSinkTestSteps() {
+        final EnumSet<TestKind> sinkKinds =
+                EnumSet.of(
+                        TestKind.SINK_WITHOUT_DATA,
+                        TestKind.SINK_WITH_DATA,
+                        TestKind.SINK_WITH_RESTORE_DATA);
+        return setupSteps.stream()
+                .filter(s -> sinkKinds.contains(s.getKind()))
+                .map(SinkTestStep.class::cast)
+                .collect(Collectors.toList());
+    }
+
+    /** Convenience method to avoid casting. It assumes that the order of 
steps is not important. */
+    public List<ConfigOptionTestStep<?>> getSetupConfigOptionTestSteps() {
+        return setupSteps.stream()
+                .filter(s -> s.getKind() == TestKind.CONFIG)
+                .map(s -> (ConfigOptionTestStep<?>) s)
+                .collect(Collectors.toList());
+    }
+
+    /** Convenience method to avoid casting. It assumes that the order of 
steps is not important. */
+    public List<FunctionTestStep> getSetupFunctionTestSteps() {
+        return setupSteps.stream()
+                .filter(s -> s.getKind() == TestKind.FUNCTION)
+                .map(FunctionTestStep.class::cast)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Convenience method to avoid boilerplate code. It assumes that only a 
single SQL statement is
+     * tested.
+     */
+    public SqlTestStep getRunSqlTestStep() {
+        Preconditions.checkArgument(
+                runSteps.size() == 1 && runSteps.get(0).getKind() == 
TestKind.SQL,
+                "Single SQL step expected.");
+        return (SqlTestStep) runSteps.get(0);
+    }
+
+    /** Builder pattern for {@link TableTestProgram}. */
+    public static class Builder {
+
+        private final String id;
+        private final String description;
+        private final List<TestStep> setupSteps = new ArrayList<>();
+        private final List<TestStep> runSteps = new ArrayList<>();
+
+        private Builder(String id, String description) {
+            this.id = id;
+            this.description = description;
+        }
+
+        /**
+         * Setup step for execution SQL.
+         *
+         * <p>Note: Not every runner supports generic SQL statements. 
Sometimes the runner would
+         * like to enrich properties e.g. of a CREATE TABLE. Use this step 
with caution.
+         */
+        public Builder setupSql(String sql) {
+            this.setupSteps.add(new SqlTestStep(sql));
+            return this;
+        }
+
+        /** Setup step for setting a {@link ConfigOption}. */
+        public <T> Builder setupConfig(ConfigOption<T> option, T value) {
+            this.setupSteps.add(new ConfigOptionTestStep<>(option, value));
+            return this;
+        }
+
+        /** Setup step for registering a temporary system function. */
+        public Builder setupTemporarySystemFunction(
+                String name, Class<? extends UserDefinedFunction> function) {
+            this.setupSteps.add(
+                    new FunctionTestStep(
+                            FunctionPersistence.TEMPORARY,
+                            FunctionBehavior.SYSTEM,
+                            name,
+                            function));
+            return this;
+        }
+
+        /** Setup step for registering a temporary catalog function. */
+        public Builder setupTemporaryCatalogFunction(
+                String name, Class<? extends UserDefinedFunction> function) {
+            this.setupSteps.add(
+                    new FunctionTestStep(
+                            FunctionPersistence.TEMPORARY,
+                            FunctionBehavior.CATALOG,
+                            name,
+                            function));
+            return this;
+        }
+
+        /** Setup step for registering a catalog function. */
+        public Builder setupCatalogFunction(
+                String name, Class<? extends UserDefinedFunction> function) {
+            this.setupSteps.add(
+                    new FunctionTestStep(
+                            FunctionPersistence.PERSISTENT,
+                            FunctionBehavior.CATALOG,
+                            name,
+                            function));
+            return this;
+        }
+
+        /** Setup step for building a table source. */
+        public SourceBuilder setupTableSource(String name) {
+            return new SourceBuilder(name, setupSteps, this);
+        }
+
+        /** Setup step for building a table sink. */
+        public SinkBuilder setupTableSink(String name) {
+            return new SinkBuilder(name, setupSteps, this);
+        }
+
+        /** Run step for executing SQL. */
+        public Builder runSql(String sql) {
+            this.runSteps.add(new SqlTestStep(sql));
+            return this;
+        }
+
+        /** Run step for executing a statement set. */
+        public StatementSetBuilder runStatementSet() {
+            return new StatementSetBuilder(runSteps, this);
+        }
+
+        public TableTestProgram build() {
+            return new TableTestProgram(id, description, setupSteps, runSteps);
+        }
+    }
+
+    /** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */
+    @SuppressWarnings("unchecked")
+    private static class TableBuilder<SpecificBuilder extends 
TableBuilder<SpecificBuilder>> {
+
+        protected final String name;
+        protected final List<TestStep> targetSteps;
+        protected final Builder rootBuilder;
+
+        protected final List<String> schemaComponents = new ArrayList<>();
+        protected final List<String> partitionKeys = new ArrayList<>();
+        protected final Map<String, String> options = new HashMap<>();
+
+        private TableBuilder(String name, List<TestStep> targetSteps, Builder 
rootBuilder) {
+            this.name = name;
+            this.targetSteps = targetSteps;
+            this.rootBuilder = rootBuilder;
+        }
+
+        /**
+         * Define the schema like you would in SQL e.g. "my_col INT", "PRIMARY 
KEY (uid) NOT
+         * ENFORCED", or "WATERMARK FOR ts AS ts".
+         */
+        public SpecificBuilder withSchema(String... schemaComponents) {
+            this.schemaComponents.addAll(Arrays.asList(schemaComponents));
+            return (SpecificBuilder) this;
+        }
+
+        /**
+         * Unless the test requires a very specific configuration, try to 
avoid calling this method
+         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         */
+        public SpecificBuilder withOptions(Map<String, String> options) {
+            this.options.putAll(options);
+            return (SpecificBuilder) this;
+        }
+
+        /**
+         * Unless the test requires a very specific configuration, try to 
avoid calling this method
+         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         */
+        public SpecificBuilder withOption(String key, String value) {
+            this.options.put(key, value);
+            return (SpecificBuilder) this;
+        }
+
+        /**
+         * Unless the test requires a very specific configuration, try to 
avoid calling this method
+         * and fill in options later via {@link 
TableTestStep#apply(TableEnvironment, Map)}.
+         */
+        public <T> SpecificBuilder withOption(ConfigOption<T> option, String 
value) {
+            this.options.put(option.key(), 
ConfigurationUtils.convertValue(value, String.class));
+            return (SpecificBuilder) this;
+        }
+
+        public SpecificBuilder withPartitionKeys(String... partitionKeys) {
+            this.partitionKeys.addAll(Arrays.asList(partitionKeys));
+            return (SpecificBuilder) this;
+        }
+    }
+
+    /** Builder pattern for {@link SourceTestStep}. */
+    public static class SourceBuilder extends TableBuilder<SourceBuilder> {
+
+        private final List<Row> dataBeforeRestore = new ArrayList<>();
+        private final List<Row> dataAfterRestore = new ArrayList<>();
+
+        private SourceBuilder(String name, List<TestStep> targetSteps, Builder 
rootBuilder) {
+            super(name, targetSteps, rootBuilder);
+        }
+
+        public SourceBuilder withValues(Row... data) {
+            return withValuesBeforeRestore(data);
+        }
+
+        public SourceBuilder withValuesBeforeRestore(Row... data) {
+            this.dataBeforeRestore.addAll(Arrays.asList(data));
+            return this;
+        }
+
+        public SourceBuilder withValuesAfterRestore(Row... data) {
+            this.dataAfterRestore.addAll(Arrays.asList(data));
+            return this;
+        }
+
+        public Builder complete() {
+            targetSteps.add(
+                    new SourceTestStep(
+                            name,
+                            schemaComponents,
+                            partitionKeys,
+                            options,
+                            dataBeforeRestore,
+                            dataAfterRestore));
+            return rootBuilder;
+        }
+    }
+
+    /** Builder pattern for {@link SinkTestStep}. */
+    public static class SinkBuilder extends TableBuilder<SinkBuilder> {
+
+        private Predicate<List<Row>> expectedBeforeRestore;
+        private Predicate<List<Row>> expectedAfterRestore;
+
+        private SinkBuilder(String name, List<TestStep> targetSteps, Builder 
rootBuilder) {
+            super(name, targetSteps, rootBuilder);
+        }
+
+        public SinkBuilder withExpectedValues(Row... expectedRows) {
+            return withValuesBeforeRestore(expectedRows);
+        }
+
+        public SinkBuilder withExpectedValues(String... expectedRows) {
+            return withValuesBeforeRestore(expectedRows);
+        }
+
+        public SinkBuilder withValuesBeforeRestore(Row... expectedRows) {
+            this.expectedBeforeRestore = equalIgnoringOrder(expectedRows);
+            return this;
+        }
+
+        public SinkBuilder withValuesBeforeRestore(String... expectedRows) {
+            this.expectedBeforeRestore = equalIgnoringOrder(expectedRows);
+            return this;
+        }
+
+        public SinkBuilder withValuesAfterRestore(Row... expectedRows) {
+            this.expectedAfterRestore = equalIgnoringOrder(expectedRows);
+            return this;
+        }
+
+        public SinkBuilder withValuesAfterRestore(String... expectedRows) {
+            this.expectedAfterRestore = equalIgnoringOrder(expectedRows);
+            return this;
+        }
+
+        private static Predicate<List<Row>> equalIgnoringOrder(Row... 
expectedRows) {
+            return (actualRows) -> {
+                if (actualRows.size() != expectedRows.length) {
+                    return false;
+                }
+                return CollectionUtils.isEqualCollection(actualRows, 
Arrays.asList(expectedRows));
+            };
+        }
+
+        private static Predicate<List<Row>> equalIgnoringOrder(String... 
expectedRows) {
+            return (actualRows) -> {
+                if (actualRows.size() != expectedRows.length) {
+                    return false;
+                }
+                final List<String> actualRowsString =
+                        
actualRows.stream().map(Row::toString).collect(Collectors.toList());
+                return CollectionUtils.isEqualCollection(
+                        actualRowsString, Arrays.asList(expectedRows));
+            };
+        }
+
+        public Builder complete() {
+            targetSteps.add(
+                    new SinkTestStep(
+                            name,
+                            schemaComponents,
+                            partitionKeys,
+                            options,
+                            expectedBeforeRestore,
+                            expectedAfterRestore));
+            return rootBuilder;
+        }
+    }
+
+    /** Builder pattern for {@link StatementSetTestStep}. */
+    public static class StatementSetBuilder {
+
+        private final List<TestStep> targetSteps;
+        private final Builder rootBuilder;
+        private final List<String> statements = new ArrayList<>();
+
+        private StatementSetBuilder(List<TestStep> targetSteps, Builder 
rootBuilder) {
+            this.targetSteps = targetSteps;
+            this.rootBuilder = rootBuilder;
+        }
+
+        public StatementSetBuilder withSql(String sql) {
+            this.statements.add(sql);
+            return this;
+        }
+
+        public Builder complete() {
+            this.targetSteps.add(new StatementSetTestStep(statements));
+            return rootBuilder;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunner.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunner.java
new file mode 100644
index 00000000000..cd874739135
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunner.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Interface for test bases that want to run lists of {@link 
TableTestProgram}s.
+ *
+ * <p>NOTE: See {@link TableTestProgram} for a full example.
+ *
+ * <p>Use {@link #supportedPrograms()} for assertions (usually in test bases), 
and {@link
+ * #programs()} for program lists (usually in final tests).
+ */
+public interface TableTestProgramRunner {
+
+    /**
+     * List of {@link TableTestProgram}s that this runner should run.
+     *
+     * <p>Usually, this list should reference some test programs stored in 
static variables that can
+     * be shared across runners.
+     */
+    List<TableTestProgram> programs();
+
+    /**
+     * Runners should call this method to get started.
+     *
+     * <p>Compared to {@link #programs()}, this method will perform some 
pre-checks.
+     */
+    default List<TableTestProgram> supportedPrograms() {
+        final List<TableTestProgram> programs = programs();
+
+        final List<String> ids = programs.stream().map(p -> 
p.id).collect(Collectors.toList());
+        final List<String> duplicates =
+                ids.stream()
+                        .filter(id -> Collections.frequency(ids, id) > 1)
+                        .distinct()
+                        .collect(Collectors.toList());
+        if (!duplicates.isEmpty()) {
+            throw new IllegalArgumentException("Duplicate test program id 
found: " + duplicates);
+        }
+
+        final Set<TestStep.TestKind> setupSteps = supportedSetupSteps();
+        final Set<TestStep.TestKind> runSteps = supportedRunSteps();
+
+        programs.forEach(
+                p -> {
+                    p.setupSteps.stream()
+                            .map(TestStep::getKind)
+                            .filter(k -> !setupSteps.contains(k))
+                            .findFirst()
+                            .ifPresent(
+                                    k -> {
+                                        throw new 
UnsupportedOperationException(
+                                                "Test runner does not support 
setup step: " + k);
+                                    });
+                    p.runSteps.stream()
+                            .map(TestStep::getKind)
+                            .filter(k -> !runSteps.contains(k))
+                            .findFirst()
+                            .ifPresent(
+                                    k -> {
+                                        throw new 
UnsupportedOperationException(
+                                                "Test runner does not support 
run step: " + k);
+                                    });
+                });
+
+        return programs;
+    }
+
+    /**
+     * Lists setup steps that are supported by this runner.
+     *
+     * <p>E.g. some runners might not want to run generic {@link 
TestStep.TestKind#SQL} because they
+     * want to enrich CREATE TABLE statements.
+     *
+     * <p>This also ensures that runners don't need to be updated when a new 
kind of step is added,
+     * or steps get silently dropped.
+     */
+    EnumSet<TestStep.TestKind> supportedSetupSteps();
+
+    /**
+     * Lists run steps that are supported by this runner.
+     *
+     * <p>E.g. some runners might not want to run generic {@link 
TestStep.TestKind#SQL} because they
+     * want to enrich CREATE TABLE statements.
+     *
+     * <p>This also ensures that runners don't need to be updated when a new 
kind of step is added,
+     * or steps get silently dropped.
+     */
+    EnumSet<TestStep.TestKind> supportedRunSteps();
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
new file mode 100644
index 00000000000..bff700d9a15
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Abstract class for {@link SourceTestStep} and {@link SinkTestStep}. */
+public abstract class TableTestStep implements TestStep {
+
+    public final String name;
+    public final List<String> schemaComponents;
+    public final List<String> partitionKeys;
+    public final Map<String, String> options;
+
+    TableTestStep(
+            String name,
+            List<String> schemaComponents,
+            List<String> partitionKeys,
+            Map<String, String> options) {
+        this.name = name;
+        this.schemaComponents = schemaComponents;
+        this.partitionKeys = partitionKeys;
+        this.options = options;
+    }
+
+    public TableResult apply(TableEnvironment env) {
+        return apply(env, Collections.emptyMap());
+    }
+
+    public TableResult apply(TableEnvironment env, Map<String, String> 
extraOptions) {
+        final Map<String, String> allOptions = new HashMap<>(options);
+        allOptions.putAll(extraOptions);
+
+        final String partitionedBy =
+                partitionKeys.isEmpty()
+                        ? ""
+                        : "PARTITIONED BY (" + String.join(", ", 
partitionKeys) + ")\n";
+        final String createTable =
+                String.format(
+                        "CREATE TABLE %s (\n%s)\n%sWITH (\n%s)",
+                        name,
+                        String.join(",\n", schemaComponents),
+                        partitionedBy,
+                        allOptions.entrySet().stream()
+                                .map(e -> String.format("'%s'='%s'", 
e.getKey(), e.getValue()))
+                                .collect(Collectors.joining(",\n")));
+
+        return env.executeSql(createTable);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
new file mode 100644
index 00000000000..67105b68d53
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+/**
+ * Test step that makes up a {@link TableTestProgram}.
+ *
+ * <p>It describes a task that should be executed either before running the 
actual test or as the
+ * main ingredient of the test.
+ *
+ * <p>Some steps provide {@code apply()} methods for convenience. But in the 
end, the {@link
+ * TableTestProgramRunner} decides whether to call them or not.
+ *
+ * <p>Not every {@link TableTestProgramRunner} might support every {@link 
TestKind}.
+ */
+public interface TestStep {
+
+    /**
+     * Enum to identify important properties of a {@link TestStep}.
+     *
+     * <p>Used in {@link TableTestProgramRunner#supportedSetupSteps()} and 
{@link
+     * TableTestProgramRunner#supportedRunSteps()}.
+     */
+    enum TestKind {
+        SQL,
+        STATEMENT_SET,
+        CONFIG,
+        FUNCTION,
+        SOURCE_WITHOUT_DATA,
+        SOURCE_WITH_DATA,
+        SOURCE_WITH_RESTORE_DATA,
+        SINK_WITHOUT_DATA,
+        SINK_WITH_DATA,
+        SINK_WITH_RESTORE_DATA,
+    }
+
+    TestKind getKind();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
new file mode 100644
index 00000000000..e7b58aaa864
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.table.utils.UserDefinedFunctions;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TableTestProgram} and {@link TableTestProgramRunner}. */
+public class TableTestProgramRunnerTest {
+
+    private static final String ID = "id";
+    private static final String DESCRIPTION = "description";
+
+    @Test
+    void testConfigStep() {
+        final TableTestProgram program =
+                TableTestProgram.of(ID, DESCRIPTION)
+                        .setupConfig(TableConfigOptions.LOCAL_TIME_ZONE, 
"GMT+3")
+                        .build();
+
+        assertThat(program.setupSteps).hasSize(1);
+
+        final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(env));
+        
assertThat(env.getConfig().getLocalTimeZone()).isEqualTo(ZoneId.of("GMT+3"));
+    }
+
+    @Test
+    void testFunctionStep() {
+        final TableTestProgram program =
+                TableTestProgram.of(ID, DESCRIPTION)
+                        .setupTemporarySystemFunction(
+                                "tmp_sys", 
UserDefinedFunctions.ScalarUDF.class)
+                        .setupTemporaryCatalogFunction(
+                                "tmp_cat", 
UserDefinedFunctions.ScalarUDF.class)
+                        .setupCatalogFunction("cat", 
UserDefinedFunctions.ScalarUDF.class)
+                        .build();
+
+        assertThat(program.setupSteps).hasSize(3);
+
+        final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        program.getSetupFunctionTestSteps().forEach(s -> s.apply(env));
+
+        assertThat(env.listUserDefinedFunctions()).contains("tmp_sys", 
"tmp_cat", "cat");
+    }
+
+    @Test
+    void testSqlStep() {
+        final TableTestProgram program =
+                TableTestProgram.of(ID, DESCRIPTION)
+                        .setupSql("CREATE TABLE MyTable1 (i INT) WITH 
('connector' = 'datagen')")
+                        .runSql("CREATE TABLE MyTable2 (i INT) WITH 
('connector' = 'datagen')")
+                        .build();
+
+        assertThat(program.setupSteps).hasSize(1);
+        assertThat(program.runSteps).hasSize(1);
+
+        final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        program.setupSteps.stream().map(SqlTestStep.class::cast).forEach(s -> 
s.apply(env));
+        program.runSteps.stream().map(SqlTestStep.class::cast).forEach(s -> 
s.apply(env));
+
+        assertThat(env.listTables()).contains("MyTable1", "MyTable2");
+    }
+
+    @Test
+    @SuppressWarnings("resource")
+    void testTableStep() {
+        final TableTestProgram program =
+                TableTestProgram.of(ID, DESCRIPTION)
+                        .setupTableSource("MyTableSource")
+                        .withSchema("i INT")
+                        .withOption("connector", "datagen")
+                        .complete()
+                        .setupTableSource("MyTableSink")
+                        .withSchema("i INT")
+                        .withOption("connector", "blackhole")
+                        .complete()
+                        .build();
+
+        assertThat(program.setupSteps).hasSize(2);
+
+        final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        program.getSetupSourceTestSteps()
+                .forEach(s -> s.apply(env, 
Collections.singletonMap("number-of-rows", "3")));
+        program.getSetupSinkTestSteps().forEach(s -> s.apply(env));
+
+        assertThat(env.executeSql("SHOW CREATE TABLE 
MyTableSource").collect().next().getField(0))
+                .isEqualTo(
+                        "CREATE TABLE 
`default_catalog`.`default_database`.`MyTableSource` (\n"
+                                + "  `i` INT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'datagen',\n"
+                                + "  'number-of-rows' = '3'\n"
+                                + ")\n");
+        assertThat(env.executeSql("SHOW CREATE TABLE 
MyTableSink").collect().next().getField(0))
+                .isEqualTo(
+                        "CREATE TABLE 
`default_catalog`.`default_database`.`MyTableSink` (\n"
+                                + "  `i` INT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'blackhole',\n"
+                                + "  'number-of-rows' = '3'\n"
+                                + ")\n");
+    }
+
+    @Test
+    void testRunnerValidationDuplicate() {
+        final TableTestProgram program1 =
+                TableTestProgram.of(ID, DESCRIPTION).runSql("SELECT 
1").build();
+
+        final TableTestProgram program2 =
+                TableTestProgram.of(ID, DESCRIPTION).runSql("SELECT 
1").build();
+
+        final LimitedTableTestProgramRunner runner = new 
LimitedTableTestProgramRunner();
+        runner.programs = Arrays.asList(program1, program2);
+
+        assertThatThrownBy(runner::supportedPrograms)
+                .hasMessageContaining("Duplicate test program id found: [id]");
+    }
+
+    @Test
+    void testRunnerValidationUnsupported() {
+        final LimitedTableTestProgramRunner runner = new 
LimitedTableTestProgramRunner();
+
+        final TableTestProgram program =
+                TableTestProgram.of(ID, DESCRIPTION).setupSql("SELECT 
1").build();
+
+        runner.programs = Collections.singletonList(program);
+
+        assertThatThrownBy(runner::supportedPrograms)
+                .hasMessageContaining("Test runner does not support setup 
step: SQL");
+    }
+
+    private static class LimitedTableTestProgramRunner implements 
TableTestProgramRunner {
+
+        List<TableTestProgram> programs;
+
+        @Override
+        public List<TableTestProgram> programs() {
+            return programs;
+        }
+
+        @Override
+        public EnumSet<TestKind> supportedSetupSteps() {
+            return EnumSet.of(TestKind.SOURCE_WITH_DATA);
+        }
+
+        @Override
+        public EnumSet<TestKind> supportedRunSteps() {
+            return EnumSet.of(TestKind.SQL);
+        }
+    }
+}

Reply via email to