This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 347e4ca6c26 [FLINK-25809][table-api-java] Add table test program infrastructure 347e4ca6c26 is described below commit 347e4ca6c265334a35969d1c8358ff5a9f066e92 Author: Timo Walther <twal...@apache.org> AuthorDate: Wed Oct 25 16:17:27 2023 +0200 [FLINK-25809][table-api-java] Add table test program infrastructure --- .../table/test/program/ConfigOptionTestStep.java | 43 ++ .../flink/table/test/program/FunctionTestStep.java | 75 ++++ .../flink/table/test/program/SinkTestStep.java | 55 +++ .../flink/table/test/program/SourceTestStep.java | 52 +++ .../flink/table/test/program/SqlTestStep.java | 46 ++ .../table/test/program/StatementSetTestStep.java | 46 ++ .../flink/table/test/program/TableTestProgram.java | 472 +++++++++++++++++++++ .../table/test/program/TableTestProgramRunner.java | 112 +++++ .../flink/table/test/program/TableTestStep.java | 73 ++++ .../apache/flink/table/test/program/TestStep.java | 54 +++ .../test/program/TableTestProgramRunnerTest.java | 182 ++++++++ 11 files changed, 1210 insertions(+) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java new file mode 100644 index 00000000000..7f66122f1cd --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.TableEnvironment; + +/** Test step for setting a {@link ConfigOption}. */ +public final class ConfigOptionTestStep<T> implements TestStep { + + public final ConfigOption<T> option; + public final T value; + + ConfigOptionTestStep(ConfigOption<T> option, T value) { + this.option = option; + this.value = value; + } + + @Override + public TestKind getKind() { + return TestKind.CONFIG; + } + + public void apply(TableEnvironment env) { + env.getConfig().set(option, value); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java new file mode 100644 index 00000000000..ad377bae4cd --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.functions.UserDefinedFunction; + +/** Test step for registering a (temporary) (system or catalog) function. */ +public final class FunctionTestStep implements TestStep { + + /** Whether function should be temporary or not. */ + enum FunctionPersistence { + TEMPORARY, + PERSISTENT + } + + /** Whether function should be persisted in a catalog or not. */ + enum FunctionBehavior { + SYSTEM, + CATALOG + } + + public final FunctionPersistence persistence; + public final FunctionBehavior behavior; + public final String name; + public final Class<? extends UserDefinedFunction> function; + + FunctionTestStep( + FunctionPersistence persistence, + FunctionBehavior behavior, + String name, + Class<? extends UserDefinedFunction> function) { + this.persistence = persistence; + this.behavior = behavior; + this.name = name; + this.function = function; + } + + @Override + public TestKind getKind() { + return TestKind.FUNCTION; + } + + public void apply(TableEnvironment env) { + if (behavior == FunctionBehavior.SYSTEM) { + if (persistence == FunctionPersistence.TEMPORARY) { + env.createTemporarySystemFunction(name, function); + } else { + throw new UnsupportedOperationException("System functions must be temporary."); + } + } else { + if (persistence == FunctionPersistence.TEMPORARY) { + env.createTemporaryFunction(name, function); + } else { + env.createFunction(name, function); + } + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java new file mode 100644 index 00000000000..42bfbb9da87 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.types.Row; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** Test step for creating a table sink. */ +public final class SinkTestStep extends TableTestStep { + + public final @Nullable Predicate<List<Row>> expectedBeforeRestore; + public final @Nullable Predicate<List<Row>> expectedAfterRestore; + + SinkTestStep( + String name, + List<String> schemaComponents, + List<String> partitionKeys, + Map<String, String> options, + @Nullable Predicate<List<Row>> expectedBeforeRestore, + @Nullable Predicate<List<Row>> expectedAfterRestore) { + super(name, schemaComponents, partitionKeys, options); + this.expectedBeforeRestore = expectedBeforeRestore; + this.expectedAfterRestore = expectedAfterRestore; + } + + @Override + public TestKind getKind() { + return expectedBeforeRestore == null + ? TestKind.SINK_WITHOUT_DATA + : expectedAfterRestore == null + ? TestKind.SINK_WITH_DATA + : TestKind.SINK_WITH_RESTORE_DATA; + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java new file mode 100644 index 00000000000..eec3b1677b0 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; + +/** Test step for creating a table source. */ +public final class SourceTestStep extends TableTestStep { + + public final List<Row> dataBeforeRestore; + public final List<Row> dataAfterRestore; + + SourceTestStep( + String name, + List<String> schemaComponents, + List<String> partitionKeys, + Map<String, String> options, + List<Row> dataBeforeRestore, + List<Row> dataAfterRestore) { + super(name, schemaComponents, partitionKeys, options); + this.dataBeforeRestore = dataBeforeRestore; + this.dataAfterRestore = dataAfterRestore; + } + + @Override + public TestKind getKind() { + return dataBeforeRestore.isEmpty() + ? TestKind.SOURCE_WITHOUT_DATA + : dataAfterRestore.isEmpty() + ? TestKind.SOURCE_WITH_DATA + : TestKind.SOURCE_WITH_RESTORE_DATA; + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java new file mode 100644 index 00000000000..c6809dc16cf --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; + +/** + * Test step for execution SQL. + * + * <p>Note: Not every runner supports generic SQL statements. Sometimes the runner would like to + * enrich properties e.g. of a CREATE TABLE. Use this step with caution. + */ +public final class SqlTestStep implements TestStep { + + public final String sql; + + SqlTestStep(String sql) { + this.sql = sql; + } + + @Override + public TestKind getKind() { + return TestKind.SQL; + } + + public TableResult apply(TableEnvironment env) { + return env.executeSql(sql); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java new file mode 100644 index 00000000000..98cf44ecc5a --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; + +import java.util.List; + +/** Test step for creating a statement set. */ +public final class StatementSetTestStep implements TestStep { + + public final List<String> statements; + + StatementSetTestStep(List<String> statements) { + this.statements = statements; + } + + @Override + public TestKind getKind() { + return TestKind.STATEMENT_SET; + } + + public TableResult apply(TableEnvironment env) { + final StatementSet statementSet = env.createStatementSet(); + statements.forEach(statementSet::addInsertSql); + return statementSet.execute(); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java new file mode 100644 index 00000000000..1b194b39ddd --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior; +import org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence; +import org.apache.flink.table.test.program.TestStep.TestKind; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.collections.CollectionUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * A generic declaration of a table program for testing. + * + * <p>A test program defines the basic test pipeline (from source to sink) and required artifacts + * such as table sources and sinks, configuration options, and user-defined functions. Because some + * programs need to create artifacts in a certain order, a program consists of individual {@link + * TestStep}s for setting up the test and the actual running of the test. + * + * <p>Tests programs are intended to reduce code duplication and test the same SQL statement though + * different layers of the stack. Different {@link TableTestProgramRunner}s can share the same + * program and enrich it with custom implementation and assertions. + * + * <p>For example, a SQL query such as {@code SELECT * FROM (VALUES (1), (2), (3))} can be declared + * once and can be shared among different tests for integration testing, optimizer plan testing, + * compiled plan testing, transformation testing, and others. + * + * <p>A typical implementation looks like: + * + * <pre>{@code + * // Define the behavior and configuration of an operation. + * public class CalcTestPrograms { + * public static final TableTestProgram CALC_SIMPLE = TableTestProgram.of("calc-simple") ...; + * public static final TableTestProgram CALC_COMPLEX = TableTestProgram.of("calc-complex") ...; + * } + * + * // Define a test base for example for plan testing + * public abstract class PlanTestBase implements TableTestProgramRunner { + * // The test base declares what kind of steps it can apply. + * public Set<TestStep.Kind> supportedSetupSteps() { return EnumSet.of(SOURCE_WITH_DATA, SINK_WITH_DATA); } + * public Set<TestStep.Kind> supportedRunSteps() { return EnumSet.of(SQL); } + * + * // Leave the list of programs up to the concrete test + * public abstract List<TableTestProgram> programs(); + * + * @ParameterizedTest + * @MethodSource("supportedPrograms") + * public void test(TableTestProgram program) { + * TableEnvironment env = ...; + * program.getSetupSourceTestSteps().forEach(s -> s.apply(env)); + * program.getSetupSinkTestSteps().forEach(s -> s.apply(env)); + * assertThat(program.getRunSqlTestStep().apply(env)).contains(...); + * } + * } + * + * // Run the test base for a category of test programs. + * public class CalcPlanTest extends PlanTestBase { + * public List<TableTestProgram> programs() = { return Arrays.asList(CALC_SIMPLE, CALC_COMPLEX); } + * } + * }</pre> + */ +public class TableTestProgram { + + /** Identifier of the test program (e.g. for naming generated files). */ + public final String id; + + /** Description for internal documentation. */ + public final String description; + + /** Steps to be executed for setting up an environment. */ + public final List<TestStep> setupSteps; + + /** Steps to be executed for running the actual test. */ + public final List<TestStep> runSteps; + + private TableTestProgram( + String id, String description, List<TestStep> setupSteps, List<TestStep> runSteps) { + this.id = id; + this.description = description; + this.setupSteps = setupSteps; + this.runSteps = runSteps; + } + + /** + * Entrypoint for a {@link TableTestProgram} that forces an identifier and description of the + * test program. + * + * <p>The identifier is necessary to (ideally globally) identify the test program in outputs. + * For example, a runner for plan tests can create directories and use the name as file names. + * + * <p>The description should give more context and should start with a verb and "s" suffix. + * + * <p>For example: + * + * <ul> + * <li>TableTestProgram.of("join-outer", "tests outer joins") + * <li>TableTestProgram.of("rank-x-enabled", "validates a rank with config flag 'x' set") + * <li>TableTestProgram.of("calc-with-projection", "verifies FLINK-12345 is fixed due to + * missing row projection") + * </ul> + */ + public static Builder of(String id, String description) { + return new Builder(id, description); + } + + /** Convenience method to avoid casting. It assumes that the order of steps is not important. */ + public List<SourceTestStep> getSetupSourceTestSteps() { + final EnumSet<TestKind> sourceKinds = + EnumSet.of( + TestKind.SOURCE_WITHOUT_DATA, + TestKind.SOURCE_WITH_DATA, + TestKind.SOURCE_WITH_RESTORE_DATA); + return setupSteps.stream() + .filter(s -> sourceKinds.contains(s.getKind())) + .map(SourceTestStep.class::cast) + .collect(Collectors.toList()); + } + + /** Convenience method to avoid casting. It assumes that the order of steps is not important. */ + public List<SinkTestStep> getSetupSinkTestSteps() { + final EnumSet<TestKind> sinkKinds = + EnumSet.of( + TestKind.SINK_WITHOUT_DATA, + TestKind.SINK_WITH_DATA, + TestKind.SINK_WITH_RESTORE_DATA); + return setupSteps.stream() + .filter(s -> sinkKinds.contains(s.getKind())) + .map(SinkTestStep.class::cast) + .collect(Collectors.toList()); + } + + /** Convenience method to avoid casting. It assumes that the order of steps is not important. */ + public List<ConfigOptionTestStep<?>> getSetupConfigOptionTestSteps() { + return setupSteps.stream() + .filter(s -> s.getKind() == TestKind.CONFIG) + .map(s -> (ConfigOptionTestStep<?>) s) + .collect(Collectors.toList()); + } + + /** Convenience method to avoid casting. It assumes that the order of steps is not important. */ + public List<FunctionTestStep> getSetupFunctionTestSteps() { + return setupSteps.stream() + .filter(s -> s.getKind() == TestKind.FUNCTION) + .map(FunctionTestStep.class::cast) + .collect(Collectors.toList()); + } + + /** + * Convenience method to avoid boilerplate code. It assumes that only a single SQL statement is + * tested. + */ + public SqlTestStep getRunSqlTestStep() { + Preconditions.checkArgument( + runSteps.size() == 1 && runSteps.get(0).getKind() == TestKind.SQL, + "Single SQL step expected."); + return (SqlTestStep) runSteps.get(0); + } + + /** Builder pattern for {@link TableTestProgram}. */ + public static class Builder { + + private final String id; + private final String description; + private final List<TestStep> setupSteps = new ArrayList<>(); + private final List<TestStep> runSteps = new ArrayList<>(); + + private Builder(String id, String description) { + this.id = id; + this.description = description; + } + + /** + * Setup step for execution SQL. + * + * <p>Note: Not every runner supports generic SQL statements. Sometimes the runner would + * like to enrich properties e.g. of a CREATE TABLE. Use this step with caution. + */ + public Builder setupSql(String sql) { + this.setupSteps.add(new SqlTestStep(sql)); + return this; + } + + /** Setup step for setting a {@link ConfigOption}. */ + public <T> Builder setupConfig(ConfigOption<T> option, T value) { + this.setupSteps.add(new ConfigOptionTestStep<>(option, value)); + return this; + } + + /** Setup step for registering a temporary system function. */ + public Builder setupTemporarySystemFunction( + String name, Class<? extends UserDefinedFunction> function) { + this.setupSteps.add( + new FunctionTestStep( + FunctionPersistence.TEMPORARY, + FunctionBehavior.SYSTEM, + name, + function)); + return this; + } + + /** Setup step for registering a temporary catalog function. */ + public Builder setupTemporaryCatalogFunction( + String name, Class<? extends UserDefinedFunction> function) { + this.setupSteps.add( + new FunctionTestStep( + FunctionPersistence.TEMPORARY, + FunctionBehavior.CATALOG, + name, + function)); + return this; + } + + /** Setup step for registering a catalog function. */ + public Builder setupCatalogFunction( + String name, Class<? extends UserDefinedFunction> function) { + this.setupSteps.add( + new FunctionTestStep( + FunctionPersistence.PERSISTENT, + FunctionBehavior.CATALOG, + name, + function)); + return this; + } + + /** Setup step for building a table source. */ + public SourceBuilder setupTableSource(String name) { + return new SourceBuilder(name, setupSteps, this); + } + + /** Setup step for building a table sink. */ + public SinkBuilder setupTableSink(String name) { + return new SinkBuilder(name, setupSteps, this); + } + + /** Run step for executing SQL. */ + public Builder runSql(String sql) { + this.runSteps.add(new SqlTestStep(sql)); + return this; + } + + /** Run step for executing a statement set. */ + public StatementSetBuilder runStatementSet() { + return new StatementSetBuilder(runSteps, this); + } + + public TableTestProgram build() { + return new TableTestProgram(id, description, setupSteps, runSteps); + } + } + + /** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */ + @SuppressWarnings("unchecked") + private static class TableBuilder<SpecificBuilder extends TableBuilder<SpecificBuilder>> { + + protected final String name; + protected final List<TestStep> targetSteps; + protected final Builder rootBuilder; + + protected final List<String> schemaComponents = new ArrayList<>(); + protected final List<String> partitionKeys = new ArrayList<>(); + protected final Map<String, String> options = new HashMap<>(); + + private TableBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) { + this.name = name; + this.targetSteps = targetSteps; + this.rootBuilder = rootBuilder; + } + + /** + * Define the schema like you would in SQL e.g. "my_col INT", "PRIMARY KEY (uid) NOT + * ENFORCED", or "WATERMARK FOR ts AS ts". + */ + public SpecificBuilder withSchema(String... schemaComponents) { + this.schemaComponents.addAll(Arrays.asList(schemaComponents)); + return (SpecificBuilder) this; + } + + /** + * Unless the test requires a very specific configuration, try to avoid calling this method + * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + */ + public SpecificBuilder withOptions(Map<String, String> options) { + this.options.putAll(options); + return (SpecificBuilder) this; + } + + /** + * Unless the test requires a very specific configuration, try to avoid calling this method + * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + */ + public SpecificBuilder withOption(String key, String value) { + this.options.put(key, value); + return (SpecificBuilder) this; + } + + /** + * Unless the test requires a very specific configuration, try to avoid calling this method + * and fill in options later via {@link TableTestStep#apply(TableEnvironment, Map)}. + */ + public <T> SpecificBuilder withOption(ConfigOption<T> option, String value) { + this.options.put(option.key(), ConfigurationUtils.convertValue(value, String.class)); + return (SpecificBuilder) this; + } + + public SpecificBuilder withPartitionKeys(String... partitionKeys) { + this.partitionKeys.addAll(Arrays.asList(partitionKeys)); + return (SpecificBuilder) this; + } + } + + /** Builder pattern for {@link SourceTestStep}. */ + public static class SourceBuilder extends TableBuilder<SourceBuilder> { + + private final List<Row> dataBeforeRestore = new ArrayList<>(); + private final List<Row> dataAfterRestore = new ArrayList<>(); + + private SourceBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) { + super(name, targetSteps, rootBuilder); + } + + public SourceBuilder withValues(Row... data) { + return withValuesBeforeRestore(data); + } + + public SourceBuilder withValuesBeforeRestore(Row... data) { + this.dataBeforeRestore.addAll(Arrays.asList(data)); + return this; + } + + public SourceBuilder withValuesAfterRestore(Row... data) { + this.dataAfterRestore.addAll(Arrays.asList(data)); + return this; + } + + public Builder complete() { + targetSteps.add( + new SourceTestStep( + name, + schemaComponents, + partitionKeys, + options, + dataBeforeRestore, + dataAfterRestore)); + return rootBuilder; + } + } + + /** Builder pattern for {@link SinkTestStep}. */ + public static class SinkBuilder extends TableBuilder<SinkBuilder> { + + private Predicate<List<Row>> expectedBeforeRestore; + private Predicate<List<Row>> expectedAfterRestore; + + private SinkBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) { + super(name, targetSteps, rootBuilder); + } + + public SinkBuilder withExpectedValues(Row... expectedRows) { + return withValuesBeforeRestore(expectedRows); + } + + public SinkBuilder withExpectedValues(String... expectedRows) { + return withValuesBeforeRestore(expectedRows); + } + + public SinkBuilder withValuesBeforeRestore(Row... expectedRows) { + this.expectedBeforeRestore = equalIgnoringOrder(expectedRows); + return this; + } + + public SinkBuilder withValuesBeforeRestore(String... expectedRows) { + this.expectedBeforeRestore = equalIgnoringOrder(expectedRows); + return this; + } + + public SinkBuilder withValuesAfterRestore(Row... expectedRows) { + this.expectedAfterRestore = equalIgnoringOrder(expectedRows); + return this; + } + + public SinkBuilder withValuesAfterRestore(String... expectedRows) { + this.expectedAfterRestore = equalIgnoringOrder(expectedRows); + return this; + } + + private static Predicate<List<Row>> equalIgnoringOrder(Row... expectedRows) { + return (actualRows) -> { + if (actualRows.size() != expectedRows.length) { + return false; + } + return CollectionUtils.isEqualCollection(actualRows, Arrays.asList(expectedRows)); + }; + } + + private static Predicate<List<Row>> equalIgnoringOrder(String... expectedRows) { + return (actualRows) -> { + if (actualRows.size() != expectedRows.length) { + return false; + } + final List<String> actualRowsString = + actualRows.stream().map(Row::toString).collect(Collectors.toList()); + return CollectionUtils.isEqualCollection( + actualRowsString, Arrays.asList(expectedRows)); + }; + } + + public Builder complete() { + targetSteps.add( + new SinkTestStep( + name, + schemaComponents, + partitionKeys, + options, + expectedBeforeRestore, + expectedAfterRestore)); + return rootBuilder; + } + } + + /** Builder pattern for {@link StatementSetTestStep}. */ + public static class StatementSetBuilder { + + private final List<TestStep> targetSteps; + private final Builder rootBuilder; + private final List<String> statements = new ArrayList<>(); + + private StatementSetBuilder(List<TestStep> targetSteps, Builder rootBuilder) { + this.targetSteps = targetSteps; + this.rootBuilder = rootBuilder; + } + + public StatementSetBuilder withSql(String sql) { + this.statements.add(sql); + return this; + } + + public Builder complete() { + this.targetSteps.add(new StatementSetTestStep(statements)); + return rootBuilder; + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunner.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunner.java new file mode 100644 index 00000000000..cd874739135 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunner.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Interface for test bases that want to run lists of {@link TableTestProgram}s. + * + * <p>NOTE: See {@link TableTestProgram} for a full example. + * + * <p>Use {@link #supportedPrograms()} for assertions (usually in test bases), and {@link + * #programs()} for program lists (usually in final tests). + */ +public interface TableTestProgramRunner { + + /** + * List of {@link TableTestProgram}s that this runner should run. + * + * <p>Usually, this list should reference some test programs stored in static variables that can + * be shared across runners. + */ + List<TableTestProgram> programs(); + + /** + * Runners should call this method to get started. + * + * <p>Compared to {@link #programs()}, this method will perform some pre-checks. + */ + default List<TableTestProgram> supportedPrograms() { + final List<TableTestProgram> programs = programs(); + + final List<String> ids = programs.stream().map(p -> p.id).collect(Collectors.toList()); + final List<String> duplicates = + ids.stream() + .filter(id -> Collections.frequency(ids, id) > 1) + .distinct() + .collect(Collectors.toList()); + if (!duplicates.isEmpty()) { + throw new IllegalArgumentException("Duplicate test program id found: " + duplicates); + } + + final Set<TestStep.TestKind> setupSteps = supportedSetupSteps(); + final Set<TestStep.TestKind> runSteps = supportedRunSteps(); + + programs.forEach( + p -> { + p.setupSteps.stream() + .map(TestStep::getKind) + .filter(k -> !setupSteps.contains(k)) + .findFirst() + .ifPresent( + k -> { + throw new UnsupportedOperationException( + "Test runner does not support setup step: " + k); + }); + p.runSteps.stream() + .map(TestStep::getKind) + .filter(k -> !runSteps.contains(k)) + .findFirst() + .ifPresent( + k -> { + throw new UnsupportedOperationException( + "Test runner does not support run step: " + k); + }); + }); + + return programs; + } + + /** + * Lists setup steps that are supported by this runner. + * + * <p>E.g. some runners might not want to run generic {@link TestStep.TestKind#SQL} because they + * want to enrich CREATE TABLE statements. + * + * <p>This also ensures that runners don't need to be updated when a new kind of step is added, + * or steps get silently dropped. + */ + EnumSet<TestStep.TestKind> supportedSetupSteps(); + + /** + * Lists run steps that are supported by this runner. + * + * <p>E.g. some runners might not want to run generic {@link TestStep.TestKind#SQL} because they + * want to enrich CREATE TABLE statements. + * + * <p>This also ensures that runners don't need to be updated when a new kind of step is added, + * or steps get silently dropped. + */ + EnumSet<TestStep.TestKind> supportedRunSteps(); +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java new file mode 100644 index 00000000000..bff700d9a15 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Abstract class for {@link SourceTestStep} and {@link SinkTestStep}. */ +public abstract class TableTestStep implements TestStep { + + public final String name; + public final List<String> schemaComponents; + public final List<String> partitionKeys; + public final Map<String, String> options; + + TableTestStep( + String name, + List<String> schemaComponents, + List<String> partitionKeys, + Map<String, String> options) { + this.name = name; + this.schemaComponents = schemaComponents; + this.partitionKeys = partitionKeys; + this.options = options; + } + + public TableResult apply(TableEnvironment env) { + return apply(env, Collections.emptyMap()); + } + + public TableResult apply(TableEnvironment env, Map<String, String> extraOptions) { + final Map<String, String> allOptions = new HashMap<>(options); + allOptions.putAll(extraOptions); + + final String partitionedBy = + partitionKeys.isEmpty() + ? "" + : "PARTITIONED BY (" + String.join(", ", partitionKeys) + ")\n"; + final String createTable = + String.format( + "CREATE TABLE %s (\n%s)\n%sWITH (\n%s)", + name, + String.join(",\n", schemaComponents), + partitionedBy, + allOptions.entrySet().stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",\n"))); + + return env.executeSql(createTable); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java new file mode 100644 index 00000000000..67105b68d53 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +/** + * Test step that makes up a {@link TableTestProgram}. + * + * <p>It describes a task that should be executed either before running the actual test or as the + * main ingredient of the test. + * + * <p>Some steps provide {@code apply()} methods for convenience. But in the end, the {@link + * TableTestProgramRunner} decides whether to call them or not. + * + * <p>Not every {@link TableTestProgramRunner} might support every {@link TestKind}. + */ +public interface TestStep { + + /** + * Enum to identify important properties of a {@link TestStep}. + * + * <p>Used in {@link TableTestProgramRunner#supportedSetupSteps()} and {@link + * TableTestProgramRunner#supportedRunSteps()}. + */ + enum TestKind { + SQL, + STATEMENT_SET, + CONFIG, + FUNCTION, + SOURCE_WITHOUT_DATA, + SOURCE_WITH_DATA, + SOURCE_WITH_RESTORE_DATA, + SINK_WITHOUT_DATA, + SINK_WITH_DATA, + SINK_WITH_RESTORE_DATA, + } + + TestKind getKind(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java new file mode 100644 index 00000000000..e7b58aaa864 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/program/TableTestProgramRunnerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.test.program.TestStep.TestKind; +import org.apache.flink.table.utils.UserDefinedFunctions; + +import org.junit.jupiter.api.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TableTestProgram} and {@link TableTestProgramRunner}. */ +public class TableTestProgramRunnerTest { + + private static final String ID = "id"; + private static final String DESCRIPTION = "description"; + + @Test + void testConfigStep() { + final TableTestProgram program = + TableTestProgram.of(ID, DESCRIPTION) + .setupConfig(TableConfigOptions.LOCAL_TIME_ZONE, "GMT+3") + .build(); + + assertThat(program.setupSteps).hasSize(1); + + final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(env)); + assertThat(env.getConfig().getLocalTimeZone()).isEqualTo(ZoneId.of("GMT+3")); + } + + @Test + void testFunctionStep() { + final TableTestProgram program = + TableTestProgram.of(ID, DESCRIPTION) + .setupTemporarySystemFunction( + "tmp_sys", UserDefinedFunctions.ScalarUDF.class) + .setupTemporaryCatalogFunction( + "tmp_cat", UserDefinedFunctions.ScalarUDF.class) + .setupCatalogFunction("cat", UserDefinedFunctions.ScalarUDF.class) + .build(); + + assertThat(program.setupSteps).hasSize(3); + + final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + program.getSetupFunctionTestSteps().forEach(s -> s.apply(env)); + + assertThat(env.listUserDefinedFunctions()).contains("tmp_sys", "tmp_cat", "cat"); + } + + @Test + void testSqlStep() { + final TableTestProgram program = + TableTestProgram.of(ID, DESCRIPTION) + .setupSql("CREATE TABLE MyTable1 (i INT) WITH ('connector' = 'datagen')") + .runSql("CREATE TABLE MyTable2 (i INT) WITH ('connector' = 'datagen')") + .build(); + + assertThat(program.setupSteps).hasSize(1); + assertThat(program.runSteps).hasSize(1); + + final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + program.setupSteps.stream().map(SqlTestStep.class::cast).forEach(s -> s.apply(env)); + program.runSteps.stream().map(SqlTestStep.class::cast).forEach(s -> s.apply(env)); + + assertThat(env.listTables()).contains("MyTable1", "MyTable2"); + } + + @Test + @SuppressWarnings("resource") + void testTableStep() { + final TableTestProgram program = + TableTestProgram.of(ID, DESCRIPTION) + .setupTableSource("MyTableSource") + .withSchema("i INT") + .withOption("connector", "datagen") + .complete() + .setupTableSource("MyTableSink") + .withSchema("i INT") + .withOption("connector", "blackhole") + .complete() + .build(); + + assertThat(program.setupSteps).hasSize(2); + + final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + program.getSetupSourceTestSteps() + .forEach(s -> s.apply(env, Collections.singletonMap("number-of-rows", "3"))); + program.getSetupSinkTestSteps().forEach(s -> s.apply(env)); + + assertThat(env.executeSql("SHOW CREATE TABLE MyTableSource").collect().next().getField(0)) + .isEqualTo( + "CREATE TABLE `default_catalog`.`default_database`.`MyTableSource` (\n" + + " `i` INT\n" + + ") WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'number-of-rows' = '3'\n" + + ")\n"); + assertThat(env.executeSql("SHOW CREATE TABLE MyTableSink").collect().next().getField(0)) + .isEqualTo( + "CREATE TABLE `default_catalog`.`default_database`.`MyTableSink` (\n" + + " `i` INT\n" + + ") WITH (\n" + + " 'connector' = 'blackhole',\n" + + " 'number-of-rows' = '3'\n" + + ")\n"); + } + + @Test + void testRunnerValidationDuplicate() { + final TableTestProgram program1 = + TableTestProgram.of(ID, DESCRIPTION).runSql("SELECT 1").build(); + + final TableTestProgram program2 = + TableTestProgram.of(ID, DESCRIPTION).runSql("SELECT 1").build(); + + final LimitedTableTestProgramRunner runner = new LimitedTableTestProgramRunner(); + runner.programs = Arrays.asList(program1, program2); + + assertThatThrownBy(runner::supportedPrograms) + .hasMessageContaining("Duplicate test program id found: [id]"); + } + + @Test + void testRunnerValidationUnsupported() { + final LimitedTableTestProgramRunner runner = new LimitedTableTestProgramRunner(); + + final TableTestProgram program = + TableTestProgram.of(ID, DESCRIPTION).setupSql("SELECT 1").build(); + + runner.programs = Collections.singletonList(program); + + assertThatThrownBy(runner::supportedPrograms) + .hasMessageContaining("Test runner does not support setup step: SQL"); + } + + private static class LimitedTableTestProgramRunner implements TableTestProgramRunner { + + List<TableTestProgram> programs; + + @Override + public List<TableTestProgram> programs() { + return programs; + } + + @Override + public EnumSet<TestKind> supportedSetupSteps() { + return EnumSet.of(TestKind.SOURCE_WITH_DATA); + } + + @Override + public EnumSet<TestKind> supportedRunSteps() { + return EnumSet.of(TestKind.SQL); + } + } +}