This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2dc4667 [SQL] Add custom table name resolution new a44f7d8 Merge pull request #9343 from akedin/custom-table-name-resolution 2dc4667 is described below commit 2dc46671311b0203db5aaf2906e0ad8c21ed8b14 Author: akedin <ke...@google.com> AuthorDate: Tue Apr 30 16:04:31 2019 -0700 [SQL] Add custom table name resolution --- .../extensions/sql/TableNameExtractionUtils.java | 98 ++++ .../extensions/sql/impl/CalciteQueryPlanner.java | 5 +- .../beam/sdk/extensions/sql/impl/TableName.java | 100 ++++ .../extensions/sql/impl/TableResolutionUtils.java | 214 +++++++++ .../extensions/sql/meta/CustomTableResolver.java | 45 ++ .../CalciteCannotParseSimpleIdentifiersTest.java | 77 +++ .../sql/CalciteParsesSimpleIdentifiersTest.java | 133 ++++++ .../sql/meta/CustomTableResolverTest.java | 514 +++++++++++++++++++++ 8 files changed, 1185 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableNameExtractionUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableNameExtractionUtils.java new file mode 100644 index 0000000..556c246 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/TableNameExtractionUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static java.util.stream.Collectors.toList; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.TableName; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.calcite.sql.SqlAsOperator; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlSetOperator; + +/** + * Helper class to extract table identifiers from the query. + * + * <p>Supports queries: + * + * <pre> + * ... FROM table... + * ... FROM table1, table2 AS x... + * ... FROM table1 JOIN (LEFT, INNER, OUTER etc) table2 JOIN table3 ... + * ... FROM table1 UNION (INTERSECT etc) SELECT ... + * </pre> + */ +public class TableNameExtractionUtils { + + public static List<TableName> extractTableNamesFromNode(SqlNode node) { + if (node instanceof SqlSelect) { + return extractTableFromSelect((SqlSelect) node); + } + + if (node instanceof SqlIdentifier) { + return extractFromIdentifier((SqlIdentifier) node); + } + + if (node instanceof SqlJoin) { + return extractFromJoin((SqlJoin) node); + } + + if (node instanceof SqlCall) { + return extractFromCall((SqlCall) node); + } + + return Collections.emptyList(); + } + + private static List<TableName> extractTableFromSelect(SqlSelect node) { + return extractTableNamesFromNode(node.getFrom()); + } + + private static List<TableName> extractFromCall(SqlCall node) { + if (node.getOperator() instanceof SqlAsOperator) { + return extractTableNamesFromNode(node.getOperandList().get(0)); + } + + if (node.getOperator() instanceof SqlSetOperator) { + return node.getOperandList().stream() + .map(TableNameExtractionUtils::extractTableNamesFromNode) + .flatMap(Collection::stream) + .collect(toList()); + } + + return Collections.emptyList(); + } + + private static List<TableName> extractFromJoin(SqlJoin join) { + return ImmutableList.<TableName>builder() + .addAll(extractTableNamesFromNode(join.getLeft())) + .addAll(extractTableNamesFromNode(join.getRight())) + .build(); + } + + private static List<TableName> extractFromIdentifier(SqlIdentifier identifier) { + return ImmutableList.of(TableName.create(identifier.names)); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 10b529c..8215346 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -71,9 +71,11 @@ class CalciteQueryPlanner implements QueryPlanner { private static final Logger LOG = LoggerFactory.getLogger(CalciteQueryPlanner.class); private final Planner planner; + private final JdbcConnection connection; public CalciteQueryPlanner(JdbcConnection connection, RuleSet[] ruleSets) { - planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); + this.connection = connection; + this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets)); } public FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) { @@ -138,6 +140,7 @@ class CalciteQueryPlanner implements QueryPlanner { BeamRelNode beamRelNode; try { SqlNode parsed = planner.parse(sqlStatement); + TableResolutionUtils.setupCustomTableResolution(connection, parsed); SqlNode validated = planner.validate(parsed); LOG.info("SQL:\n" + validated); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableName.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableName.java new file mode 100644 index 0000000..282f0c2 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableName.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.util.stream.Collectors.toList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; + +/** Represents a parsed table name that is specified in a FROM clause (and other places). */ +@AutoValue +public abstract class TableName { + + /** + * Table path up to the leaf table name. + * + * <p>Does not necessarily start from a schema name. + * + * <p>Does not include the actual table name, see {@link #getTableName()}. + */ + public abstract List<String> getPath(); + + /** Table name, the last element of the fully-specified table name with path. */ + public abstract String getTableName(); + + /** Full table name with path. */ + public static TableName create(List<String> fullPath) { + checkNotNull(fullPath, "Full table path cannot be null"); + checkArgument(fullPath.size() > 0, "Full table path has to have at least one element"); + return create(fullPath.subList(0, fullPath.size() - 1), fullPath.get(fullPath.size() - 1)); + } + + /** Table name plus the path up to but not including table name. */ + public static TableName create(List<String> path, String tableName) { + checkNotNull(tableName, "Table name cannot be null"); + return new AutoValue_TableName(path == null ? Collections.emptyList() : path, tableName); + } + + /** Whether it's a compound table name (with multiple path components). */ + public boolean isCompound() { + return getPath().size() > 0; + } + + /** Whether it's a simple name, with a single name component. */ + public boolean isSimple() { + return getPath().size() == 0; + } + + /** First element in the path. */ + public String getPrefix() { + checkState(isCompound()); + return getPath().get(0); + } + + /** + * Remove prefix, e.g. this is helpful when stripping the top-level schema to register a table + * name with a provider. + */ + public TableName removePrefix() { + List<String> pathPostfix = getPath().stream().skip(1).collect(toList()); + return TableName.create(pathPostfix, getTableName()); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java new file mode 100644 index 0000000..af6146b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.extensions.sql.TableNameExtractionUtils; +import org.apache.beam.sdk.extensions.sql.meta.CustomTableResolver; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.sql.SqlNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** TableResolutionUtils. */ +public class TableResolutionUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TableResolutionUtils.class); + + /** + * Extract table names from the FROM clauses, register them with root TableProviders that support + * custom table schema resolution, e.g. DataCatalog. + * + * <p>Go over top-level schemas in the JdbcConnection, and for all top-level table providers that + * support custom table resolution, register all the parsed table names with them. + * + * <p>This way when a table provider has custom name-resolution strategy it can analyze whether it + * supports the name without using Calcite's logic. E.g. for DataCatalog we need to assemble the + * table name back into a single string and then query the back-end, whereas Calcite would require + * us to call the back-end for each part of the table name. + * + * <p>The logic is: + * + * <pre> + * - if it's a compound identifier (table name contains multiple parts): + * - get the first part of the identifier, assume it represents a top-level schema; + * - find a top-level table provider with the same name; + * - register the table identifier with it, if supported; + * - if not supported, then ignore the table identifier, everything will be resolved using + * existing Calcite's logic; + * + * - if it's a simple identifier (contains only a table name without a schema part), + * or if there was no matching top-level schema: + * - register with the default schema, if it supports custom table resolution; + * - if it does not, existing Calcite logic will still work as is; + * </pre> + */ + static void setupCustomTableResolution(JdbcConnection connection, SqlNode parsed) { + List<TableName> tableNames = TableNameExtractionUtils.extractTableNamesFromNode(parsed); + String currentSchemaName = getCurrentSchemaName(connection); + + SchemaWithName defaultSchema = SchemaWithName.create(connection, currentSchemaName); + + if (defaultSchema.supportsCustomResolution()) { + registerWithDefaultSchema(connection, tableNames, defaultSchema); + } + + registerWithTopLevelSchemas(connection, tableNames); + } + + /** Current (default) schema name in the JdbcConnection. */ + private static String getCurrentSchemaName(JdbcConnection connection) { + try { + return connection.getSchema(); + } catch (SQLException e) { + throw new IllegalStateException( + "Unable to get current schema name from JdbcConnection. " + + "Assuming table names in the query are fully-qualified from the root.", + e); + } + } + + /** + * Simple identifiers have to be resolved by the default schema, as well as compoung identifiers + * that don't have a matching top-level schema (meaning that a user didn't specify a top-level + * schema and expected it to be inferred). + */ + private static void registerWithDefaultSchema( + JdbcConnection connection, List<TableName> tableNames, SchemaWithName defaultSchema) { + Set<String> topLevelSchemas = connection.getRootSchema().getSubSchemaNames(); + + List<TableName> simpleIdentifiers = + tableNames.stream().filter(TableName::isSimple).collect(toList()); + + List<TableName> withoutMatchingSchemas = + tableNames.stream() + .filter(name -> name.isCompound() && !topLevelSchemas.contains(name.getPrefix())) + .collect(toList()); + + List<TableName> explicitlyInDefaulSchema = + tableNames.stream() + .filter(name -> name.isCompound() && name.getPrefix().equals(defaultSchema.name)) + .map(TableName::removePrefix) + .collect(toList()); + + List<TableName> shouldGoIntoDefaultSchema = + ImmutableList.<TableName>builder() + .addAll(simpleIdentifiers) + .addAll(withoutMatchingSchemas) + .addAll(explicitlyInDefaulSchema) + .build(); + + defaultSchema.getCustomTableResolver().registerKnownTableNames(shouldGoIntoDefaultSchema); + } + + /** + * Register compound table identifiers with the matching custom resolvers that correspond to the + * top-level schemas. + */ + private static void registerWithTopLevelSchemas( + JdbcConnection connection, List<TableName> tableNames) { + + Map<String, CustomTableResolver> topLevelResolvers = getCustomTopLevelResolvers(connection); + + topLevelResolvers.forEach( + (topLevelSchemaName, resolver) -> + resolver.registerKnownTableNames(tablesForSchema(tableNames, topLevelSchemaName))); + } + + /** Get the custom schema resolvers for all top-level schemas that support custom resolution. */ + private static Map<String, CustomTableResolver> getCustomTopLevelResolvers( + JdbcConnection connection) { + return connection.getRootSchema().getSubSchemaNames().stream() + .map(topLevelSchemaName -> SchemaWithName.create(connection, topLevelSchemaName)) + .filter(schema -> !schema.getName().equals(getCurrentSchemaName(connection))) + .filter(SchemaWithName::supportsCustomResolution) + .collect(toMap(SchemaWithName::getName, SchemaWithName::getCustomTableResolver)); + } + + /** + * Get the compound identifiers that have the first component matching the given top-level schema + * name and remove the first component. + */ + private static List<TableName> tablesForSchema( + List<TableName> tableNames, String topLevelSchema) { + return tableNames.stream() + .filter(TableName::isCompound) + .filter(t -> t.getPrefix().equals(topLevelSchema)) + .map(TableName::removePrefix) + .collect(toList()); + } + + /** + * A utility class that keeps track of schema name and other properties. + * + * <p>Sole purpose is to reduce inline boilerplate and encapsulate stuff. + */ + private static class SchemaWithName { + String name; + org.apache.calcite.schema.Schema schema; + + static SchemaWithName create(JdbcConnection connection, String name) { + SchemaWithName schemaWithName = new SchemaWithName(); + schemaWithName.name = name; + schemaWithName.schema = + CalciteSchema.from(connection.getRootSchema().getSubSchema(name)).schema; + return schemaWithName; + } + + /** Whether this schema/table provider supports custom table resolution. */ + boolean supportsCustomResolution() { + return isBeamSchema() && tableProviderSupportsCustomResolution(); + } + + /** Whether this Calcite schema is actually an instance of BeamCalciteSchema. */ + boolean isBeamSchema() { + return schema instanceof BeamCalciteSchema; + } + + /** Whether the table provider is an instance of CustomTableResolver. */ + boolean tableProviderSupportsCustomResolution() { + return getTableProvider() instanceof CustomTableResolver; + } + + /** Gets the table provider that backs the BeamCalciteSchema. */ + TableProvider getTableProvider() { + checkState(isBeamSchema()); + return ((BeamCalciteSchema) schema).getTableProvider(); + } + + /** Schema name. */ + String getName() { + return name; + } + + /** Custom table resolver in the provider. */ + CustomTableResolver getCustomTableResolver() { + checkState(supportsCustomResolution()); + return (CustomTableResolver) getTableProvider(); + } + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolver.java new file mode 100644 index 0000000..fc066bd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.TableName; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; + +/** + * Interface that table providers can implement if they require custom table name resolution. + * + * <p>{@link #registerKnownTableNames(List)} is called by the parser/planner and takes the list of + * all tables mentioned in the query. Then when normal Calcite lifecycle is executed the table + * provider can now check against this list and perform custom resolution. This is a workaround for + * lack of context in Calcite's logic, e.g. it's impossible to receive the whole table name at once, + * or understand that it has done querying sub-schemas and expects a table. + */ +public interface CustomTableResolver extends TableProvider { + + /** + * Register the table names as extracted from the FROM clause. + * + * <p>Calcite doesn't provide these full names to table providers and queries them with individual + * parts of the identifiers without giving any extra context. So if a table provider needs to + * implement some custom table name resolution strategy it doesn't have information to do so. E.g. + * if you want to take the compound SQL identifiers that were originally split by dots, join them + * into a single string, and then query a back-end service, this interface makes this possible. + */ + void registerKnownTableNames(List<TableName> tableNames); +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteCannotParseSimpleIdentifiersTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteCannotParseSimpleIdentifiersTest.java new file mode 100644 index 0000000..c696c28 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteCannotParseSimpleIdentifiersTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static org.junit.Assert.assertThrows; + +import java.io.Serializable; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.ParseException; +import org.apache.beam.sdk.testing.TestPipeline; +import org.junit.Rule; +import org.junit.Test; +import org.junit.function.ThrowingRunnable; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Examples of simple identifiers that Calcite is unable to parse. + * + * <p>Not an exhaustive list. + */ +@RunWith(Parameterized.class) +public class CalciteCannotParseSimpleIdentifiersTest implements Serializable { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private final String input; + + @Parameters(name = "{0}") + public static Iterable<Object> data() { + return Arrays.asList( + new Object[] { + "field id", + "field\nid", + "`field\nid`", + "field`id", + "field\\id", + "field``id", + "field\bid", + "field=id", + "field+id", + "field{id}", + "field.id", + "field\r_id", + "`field\r_id`" + }); + } + + public CalciteCannotParseSimpleIdentifiersTest(String input) { + this.input = input; + } + + @Test + public void testFailsToParseAlias() { + assertThrows(ParseException.class, attemptParse(input)); + } + + private ThrowingRunnable attemptParse(String alias) { + return () -> BeamSqlEnv.inMemory().isDdl(String.format("SELECT 321 AS %s", alias)); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteParsesSimpleIdentifiersTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteParsesSimpleIdentifiersTest.java new file mode 100644 index 0000000..2532c02 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/CalciteParsesSimpleIdentifiersTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Examples of simple identifiers that Calcite is able to parse. + * + * <p>Not an exhaustive list. + */ +@RunWith(Parameterized.class) +public class CalciteParsesSimpleIdentifiersTest implements Serializable { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private final String input; + private final String expected; + + @Parameters(name = "{0}") + public static Iterable<Object[]> data() { + return Arrays.asList( + new Object[][] { + // -------------------------------- + // user input | parsed as | + // -------------------------------- + {"field_id", "field_id"}, + {"`field_id`", "field_id"}, + {"`field``id`", "field`id"}, + {"`field id`", "field id"}, + {"`field-id`", "field-id"}, + {"`field=id`", "field=id"}, + {"`field.id`", "field.id"}, + {"`field{id}`", "field{id}"}, + {"`field|id`", "field|id"}, + {"`field\\id`", "field\\id"}, + {"`field\\a_id`", "field\\a_id"}, + {"`field\b_id`", "field\b_id"}, + {"`field\\b_id`", "field\\b_id"}, + {"`field\\f_id`", "field\\f_id"}, + {"`field\\n_id`", "field\\n_id"}, + {"`field\\r_id`", "field\\r_id"}, + {"`field\tid`", "field\tid"}, + {"`field\\t_id`", "field\\t_id"}, + {"`field\\v_id`", "field\\v_id"}, + {"`field\\\\_id`", "field\\\\_id"}, + {"`field\\?_id`", "field\\?_id"} + }); + } + + public CalciteParsesSimpleIdentifiersTest(String input, String expected) { + this.input = input; + this.expected = expected; + } + + @Test + public void testParsesAlias() { + assertThat(alias(input), parsedAs(expected)); + } + + /** PCollection with a single row with a single field with the specified alias. */ + private PCollection<Row> alias(String alias) { + return pipeline.apply(SqlTransform.query(String.format("SELECT 321 AS %s", alias))); + } + + /** + * Asserts that the specified field alias is parsed as expected. + * + * <p>SQL parser un-escapes the qouted identifiers, for example. + */ + private Matcher<PCollection<Row>> parsedAs(String expected) { + return new BaseMatcher<PCollection<Row>>() { + @Override + public boolean matches(Object actual) { + PCollection<Row> result = (PCollection<Row>) actual; + PAssert.thatSingleton(result).satisfies(assertFieldNameIs(expected)); + pipeline.run(); + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("field alias matches"); + } + }; + } + + /** Assert that field name of the only field matches the expected value. */ + private SerializableFunction<Row, Void> assertFieldNameIs(String expected) { + return row -> { + assertEquals(expected, onlyField(row).getName()); + return null; + }; + } + + /** Returns the only field in the row. */ + private Schema.Field onlyField(Row row) { + assertEquals(1, row.getFieldCount()); + return row.getSchema().getField(0); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java new file mode 100644 index 0000000..484e031 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/CustomTableResolverTest.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta; + +import static java.util.stream.Collectors.toList; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.extensions.sql.impl.TableName; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; + +/** CustomTableResolverTest. */ +public class CustomTableResolverTest implements Serializable { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private static final Schema BASIC_SCHEMA = + Schema.builder().addInt32Field("id").addStringField("name").build(); + + /** + * Test table provider with custom name resolution. + * + * <p>Demonstrates how to parse table names as in normal Calcite queries syntax, e.g. {@code + * a.b.c.d} and convert them to its' own custom table name format {@code a_b_c_d}. + */ + public static class CustomResolutionTestTableProvider extends TestTableProvider + implements CustomTableResolver { + + List<TableName> parsedTableNames = null; + + @Override + public void registerKnownTableNames(List<TableName> tableNames) { + parsedTableNames = tableNames; + } + + @Override + public TableProvider getSubProvider(String name) { + // TODO: implement with trie + + // If 'name' matches a sub-schema/sub-provider we start tracking + // the subsequent calls to getSubProvider(). + // + // Simple table ids and final table lookup + // + // If there is no matching sub-schema then returning null from here indicates + // that 'name' is either not part of this schema or it's a table, not a sub-schema, + // this will be checked right after this in a getTable() call. + // + // Because this is a getSubProvider() call it means Calcite expects + // the sub-schema/sub-provider to be returned, not a table, + // so we only need to check against known compound table identifiers. + // If 'name' acutally represents a simple identifier then it will be checked + // in a 'getTable()' call later. Unless there's the same sub-provider name, + // in which case it's a conflict and we will use the sub-schema and not assume it's a table. + // Calcite does the same. + // + // Here we find if there are any parsed tables that start from 'name' that belong to this + // table provider. + // We then create a fake tracking provider that in a trie-manner collects + // getSubProvider()/getTable() calls by checking whether there are known parsed table names + // matching what Calcite asks us for. + List<TableName> tablesToLookFor = + parsedTableNames.stream() + .filter(TableName::isCompound) + .filter(tableName -> tableName.getPrefix().equals(name)) + .collect(toList()); + + return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, tablesToLookFor) : null; + } + + class TableNameTrackingProvider extends TestTableProvider { + int schemaLevel; + List<TableName> tableNames; + + TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) { + this.schemaLevel = schemaLevel; + this.tableNames = tableNames; + } + + @Override + public TableProvider getSubProvider(String name) { + // Find if any of the parsed table names have 'name' as part + // of their path at current index. + // + // If there are, return a new tracking provider for such tables and incremented index. + // + // If there are none, it means something weird has happened and returning null + // will make Calcite try other schemas. Maybe things will work out. + // + // However since we originally register all parsed table names for the given schema + // in this provider we should only receive a getSubProvider() call for something unknown + // when it's a leaf path element, i.e. actual table name, which will be handled in + // getTable() call. + List<TableName> matchingTables = + tableNames.stream() + .filter(TableName::isCompound) + .filter(tableName -> tableName.getPath().size() > schemaLevel) + .filter(tableName -> tableName.getPath().get(schemaLevel).equals(name)) + .collect(toList()); + + return matchingTables.size() > 0 + ? new TableNameTrackingProvider(schemaLevel + 1, matchingTables) + : null; + } + + @Nullable + @Override + public Table getTable(String name) { + + // This is called only after getSubProvider() returned null, + // and since we are tracking the actual parsed table names, this should + // be it, there should exist a parsed table that matches the 'name'. + + Optional<TableName> matchingTable = + tableNames.stream() + .filter(tableName -> tableName.getTableName().equals(name)) + .findFirst(); + + TableName tableName = + matchingTable.orElseThrow( + () -> + new IllegalStateException( + "Unexpected table '" + + name + + "' requested. Current schema level is " + + schemaLevel + + ". Current known table names: " + + tableNames.toString())); + // For test we register tables with underscore instead of dots, so here we lookup the tables + // with those underscore + String actualTableName = + String.join("_", tableName.getPath()) + "_" + tableName.getTableName(); + return CustomResolutionTestTableProvider.this.getTable(actualTableName); + } + + @Override + public synchronized BeamSqlTable buildBeamSqlTable(Table table) { + return CustomResolutionTestTableProvider.this.buildBeamSqlTable(table); + } + } + } + + @Test + public void testSimpleId() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testtable") + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testSimpleIdWithExplicitDefaultSchema() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider.testtable") + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testSimpleIdWithExplicitDefaultSchemaWithMultipleProviders() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider2.testtable2") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testSimpleIdWithExplicitNonDefaultSchema() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable", row(1, "one"), row(2, "two")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable2").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable2", row(3, "three"), row(4, "four")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider2.testtable2") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testCompoundIdInDefaultSchema() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testtable.blah") + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testCompoundIdInExplicitDefaultSchema() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah", row(1, "one"), row(2, "two")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider.testtable.blah") + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testLongCompoundIdInDefaultSchema() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar") + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testLongCompoundIdInDefaultSchemaWithMultipleProviders() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testtable.blah.foo.bar") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testLongCompoundIdInExplicitDefaultSchema() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider.testtable.blah.foo.bar") + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(1, "one"), row(2, "two")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testLongCompoundIdInNonDefaultSchemaSameTableNames() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable_blah_foo_bar", row(3, "three"), row(4, "four")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider2.testtable.blah.foo.bar") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testLongCompoundIdInNonDefaultSchemaDifferentNames() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(1, "one"), row(2, "two")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder() + .name("testtable2_blah2_foo2_bar2") + .schema(BASIC_SCHEMA) + .type("test") + .build()); + tableProvider2.addRows("testtable2_blah2_foo2_bar2", row(3, "three"), row(4, "four")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query("SELECT id, name FROM testprovider2.testtable2.blah2.foo2.bar2") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(3, "three"), row(4, "four")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testJoinWithLongCompoundIds() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query( + "SELECT testprovider2.testtable.blah.foo.bar2.id, testtable.blah.foo.bar.name \n" + + "FROM \n" + + " testprovider2.testtable.blah.foo.bar2 \n" + + "JOIN \n" + + " testtable.blah.foo.bar \n" + + "USING(name)") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testInnerJoinWithLongCompoundIds() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query( + "SELECT testprovider2.testtable.blah.foo.bar2.id, testtable.blah.foo.bar.name \n" + + "FROM \n" + + " testprovider2.testtable.blah.foo.bar2 \n" + + "JOIN \n" + + " testtable.blah.foo.bar \n" + + "USING(name)") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testJoinWithLongCompoundIdsWithAliases() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query( + "SELECT b.id, a.name \n" + + "FROM \n" + + " testprovider2.testtable.blah.foo.bar2 AS b \n" + + "JOIN \n" + + " testtable.blah.foo.bar a\n" + + "USING(name)") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result).containsInAnyOrder(row(4, "customer"), row(1, "nobody")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testUnionWithLongCompoundIds() throws Exception { + TestTableProvider tableProvider = new CustomResolutionTestTableProvider(); + tableProvider.createTable( + Table.builder().name("testtable_blah_foo_bar").schema(BASIC_SCHEMA).type("test").build()); + tableProvider.addRows("testtable_blah_foo_bar", row(3, "customer"), row(2, "nobody")); + + TestTableProvider tableProvider2 = new CustomResolutionTestTableProvider(); + tableProvider2.createTable( + Table.builder().name("testtable_blah_foo_bar2").schema(BASIC_SCHEMA).type("test").build()); + tableProvider2.addRows("testtable_blah_foo_bar2", row(4, "customer"), row(1, "nobody")); + + PCollection<Row> result = + pipeline.apply( + SqlTransform.query( + "SELECT id, name \n" + + "FROM \n" + + " testprovider2.testtable.blah.foo.bar2 \n" + + "UNION \n" + + " SELECT id, name \n" + + " FROM \n" + + " testtable.blah.foo.bar \n") + .withTableProvider("testprovider2", tableProvider2) + .withDefaultTableProvider("testprovider", tableProvider)); + + PAssert.that(result) + .containsInAnyOrder( + row(4, "customer"), row(1, "nobody"), row(3, "customer"), row(2, "nobody")); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + private Row row(int id, String name) { + return Row.withSchema(BASIC_SCHEMA).addValues(id, name).build(); + } +}