Add stream-table join support for samza sql Author: Aditya Toomula <[email protected]>
Reviewers: Yi Pan <[email protected]> Closes #425 from atoomula/join Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/956cf412 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/956cf412 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/956cf412 Branch: refs/heads/master Commit: 956cf412a44812d54286fd9a0c4d167239387362 Parents: 2d7b0f5 Author: Aditya Toomula <[email protected]> Authored: Wed Mar 21 17:58:07 2018 -0700 Committer: xiliu <[email protected]> Committed: Wed Mar 21 17:58:07 2018 -0700 ---------------------------------------------------------------------- build.gradle | 5 +- .../samza/runtime/LocalApplicationRunner.java | 1 + .../org/apache/samza/config/StreamConfig.scala | 1 + .../samza/sql/data/SamzaSqlCompositeKey.java | 81 ++++ .../sql/data/SamzaSqlExecutionContext.java | 4 + .../samza/sql/data/SamzaSqlRelMessage.java | 29 +- .../impl/ConfigBasedSourceResolverFactory.java | 50 ++- .../samza/sql/interfaces/SourceResolver.java | 14 +- .../sql/interfaces/SqlSystemSourceConfig.java | 129 +++++++ .../sql/interfaces/SqlSystemStreamConfig.java | 117 ------ .../apache/samza/sql/planner/QueryPlanner.java | 12 +- .../sql/runner/SamzaSqlApplicationConfig.java | 21 +- .../sql/runner/SamzaSqlApplicationRunner.java | 6 +- .../samza/sql/testutil/SamzaSqlQueryParser.java | 31 +- .../samza/sql/translator/FilterTranslator.java | 20 +- .../samza/sql/translator/JoinTranslator.java | 279 ++++++++++++++ .../samza/sql/translator/ProjectTranslator.java | 4 +- .../samza/sql/translator/QueryTranslator.java | 16 +- .../SamzaSqlRelMessageJoinFunction.java | 121 ++++++ .../samza/sql/translator/ScanTranslator.java | 12 +- .../apache/samza/sql/TestQueryTranslator.java | 370 +++++++++++++++++- .../sql/TestSamzaSqlApplicationConfig.java | 8 +- .../samza/sql/TestSamzaSqlQueryParser.java | 24 +- .../sql/TestSamzaSqlRelMessageJoinFunction.java | 116 ++++++ .../samza/sql/TestSamzaSqlRelMessageSerde.java | 43 +++ .../apache/samza/sql/avro/schemas/Company.avsc | 39 ++ .../apache/samza/sql/avro/schemas/Company.java | 52 +++ .../sql/avro/schemas/EnrichedPageView.avsc | 45 +++ .../sql/avro/schemas/EnrichedPageView.java | 56 +++ .../apache/samza/sql/avro/schemas/PageView.avsc | 39 ++ .../apache/samza/sql/avro/schemas/PageView.java | 52 +++ .../apache/samza/sql/avro/schemas/Profile.avsc | 45 +++ .../apache/samza/sql/avro/schemas/Profile.java | 56 +++ .../samza/sql/e2e/TestSamzaSqlEndToEnd.java | 153 -------- .../samza/sql/system/SimpleSystemAdmin.java | 11 +- .../samza/sql/system/TestAvroSystemFactory.java | 95 ++++- .../samza/sql/testutil/SamzaSqlTestConfig.java | 55 ++- .../sql/testutil/TestSourceResolverFactory.java | 26 +- samza-sql/src/test/resources/log4j.xml | 6 + .../test/samzasql/TestSamzaSqlEndToEnd.java | 385 +++++++++++++++++++ .../org/apache/samza/tools/SamzaSqlConsole.java | 10 +- 41 files changed, 2256 insertions(+), 383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 44a6ccd..d96ec96 100644 --- a/build.gradle +++ b/build.gradle @@ -316,11 +316,12 @@ project(':samza-sql') { dependencies { compile project(':samza-api') compile project(":samza-kafka_$scalaVersion") + compile project(":samza-kv-inmemory_$scalaVersion") + compile project(":samza-kv-rocksdb_$scalaVersion") compile "org.apache.avro:avro:$avroVersion" compile "org.apache.calcite:calcite-core:$calciteVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" - testCompile project(":samza-test_$scalaVersion") testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" @@ -753,6 +754,7 @@ project(":samza-test_$scalaVersion") { compile project(":samza-kv-inmemory_$scalaVersion") compile project(":samza-kv-rocksdb_$scalaVersion") compile project(":samza-core_$scalaVersion") + compile project(":samza-sql") runtime project(":samza-log4j") runtime project(":samza-yarn_$scalaVersion") runtime project(":samza-kafka_$scalaVersion") @@ -769,6 +771,7 @@ project(":samza-test_$scalaVersion") { testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test" testCompile project(":samza-core_$scalaVersion").sourceSets.test.output + testCompile project(":samza-sql").sourceSets.test.output testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 5c5ee84..9529581 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -153,6 +153,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { String executionPlanJson = plan.getPlanAsJson(); writePlanJsonFile(executionPlanJson); + LOG.info("Execution Plan: \n" + executionPlanJson); // 2. create the necessary streams // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391 http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 31f9b92..db86969 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -50,6 +50,7 @@ object StreamConfig { val IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED val PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT + val BOOTSTRAP_FOR_STREAM_ID = STREAM_ID_PREFIX + BOOTSTRAP implicit def Config2Stream(config: Config) = new StreamConfig(config) } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java new file mode 100644 index 0000000..f646d9a --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java @@ -0,0 +1,81 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.data; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + + +/** + * A serializable class that holds different key parts. + */ +public class SamzaSqlCompositeKey implements Serializable { + + @JsonProperty("keyParts") + private ArrayList<Object> keyParts; + private int hashCode; + + @JsonCreator + public SamzaSqlCompositeKey(@JsonProperty("keyParts") List<Object> keyParts) { + this.keyParts = new ArrayList<>(keyParts); + hashCode = keyParts.hashCode(); + } + + /** + * Get the keyParts of all the columns in the relational message. + * @return the keyParts of all the columns + */ + @JsonProperty("keyParts") + public ArrayList<Object> getKeyParts() { + return keyParts; + } + + @Override + public String toString() { + return String.join(", ", Arrays.toString(keyParts.toArray())); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass() && keyParts.equals(((SamzaSqlCompositeKey) o).keyParts); + } + + /** + * Create the SamzaSqlCompositeKey from the rel message. + * @param message Represents the samza sql rel message. + * @param relIdx list of keys in the form of field indices within the rel message. + */ + public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) { + ArrayList<Object> keyParts = new ArrayList<>(); + for (int idx : relIdx) { + keyParts.add(message.getFieldValues().get(idx)); + } + return new SamzaSqlCompositeKey(keyParts); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java index 88bcb61..b0c30dd 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java @@ -58,4 +58,8 @@ public class SamzaSqlExecutionContext { scalarUdf.init(udfConfig); return scalarUdf; } + + public SamzaSqlApplicationConfig getSamzaSqlApplicationConfig() { + return sqlConfig; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java index 452a32c..b54634f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java @@ -19,10 +19,12 @@ package org.apache.samza.sql.data; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Optional; import org.apache.commons.lang.Validate; +import org.codehaus.jackson.annotate.JsonProperty; /** @@ -31,15 +33,19 @@ import org.apache.commons.lang.Validate; * their associated column names. Right now we donot store any other metadata other than the column name in the * SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around * primary Key, nullability, etc. + * TODO: SAMZA-1619 Support serialization of nested SamzaSqlRelMessage. */ -public class SamzaSqlRelMessage { +public class SamzaSqlRelMessage implements Serializable { public static final String KEY_NAME = "__key__"; - private final List<Object> fieldValues = new ArrayList<>(); - private final List<String> fieldNames = new ArrayList<>(); private final Object key; + @JsonProperty("fieldNames") + private final List<String> fieldNames; + @JsonProperty("fieldValues") + private final List<Object> fieldValues; + /** * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values. * If the field list contains KEY, then it extracts the key out of the fields to creates a @@ -49,16 +55,20 @@ public class SamzaSqlRelMessage { * delete change capture event in the stream or because of the result of the outer join or the fields * themselves are null in the original stream. */ - public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) { + public SamzaSqlRelMessage(@JsonProperty("fieldNames") List<String> fieldNames, + @JsonProperty("fieldValues") List<Object> fieldValues) { Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length."); + this.fieldNames = new ArrayList<>(); + this.fieldValues = new ArrayList<>(); + int keyIndex = fieldNames.indexOf(KEY_NAME); Object key = null; if (keyIndex != -1) { key = fieldValues.get(keyIndex); } - this.key = key; + this.fieldNames.addAll(fieldNames); this.fieldValues.addAll(fieldValues); } @@ -74,10 +84,15 @@ public class SamzaSqlRelMessage { */ public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) { Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length."); + + this.fieldNames = new ArrayList<>(); + this.fieldValues = new ArrayList<>(); + this.key = key; this.fieldNames.add(KEY_NAME); - this.fieldNames.addAll(fieldNames); this.fieldValues.add(key); + + this.fieldNames.addAll(fieldNames); this.fieldValues.addAll(fieldValues); } @@ -85,10 +100,12 @@ public class SamzaSqlRelMessage { * Get the field names of all the columns in the relational message. * @return the field names of all columns. */ + @JsonProperty("fieldNames") public List<String> getFieldNames() { return fieldNames; } + @JsonProperty("fieldValues") public List<Object> getFieldValues() { return this.fieldValues; } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java index a2d8b0c..5348d3d 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java @@ -23,14 +23,16 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.sql.interfaces.SourceResolver; import org.apache.samza.sql.interfaces.SourceResolverFactory; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Source Resolver implementation that uses static config to return a config corresponding to a system stream. - * This Source resolver implementation supports sources of type {systemName}.{streamName} + * This Source resolver implementation supports sources of type {systemName}.{streamName}[.$table] + * {systemName}.{streamName} indicates a stream + * {systemName}.{streamName}.$table indicates a table */ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory { @@ -44,6 +46,7 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory { } private class ConfigBasedSourceResolver implements SourceResolver { + private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table"; private final Config config; public ConfigBasedSourceResolver(Config config) { @@ -51,19 +54,52 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory { } @Override - public SqlSystemStreamConfig fetchSourceInfo(String source) { + public SqlSystemSourceConfig fetchSourceInfo(String source) { String[] sourceComponents = source.split("\\."); + boolean isTable = false; + + // This source resolver expects sources of format {systemName}.{streamName}[.$table] + // * First source part is always system name. + // * The last source part could be either a "$table" keyword or stream name. If it is "$table", then stream name + // should be the one before the last source part. + int endIdx = sourceComponents.length - 1; + int streamIdx = endIdx; + boolean invalidQuery = false; - // This source resolver expects sources of format {systemName}.{streamName} if (sourceComponents.length != 2) { - String msg = String.format("Source %s is not of the format {systemName}.{streamName{", source); + if (sourceComponents.length != 3 || + !sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) { + invalidQuery = true; + } + } else { + if (sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) || + sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) { + invalidQuery = true; + } + } + + if (invalidQuery) { + String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source, + SAMZA_SQL_QUERY_TABLE_KEYWORD); LOG.error(msg); throw new SamzaException(msg); } + + if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) { + isTable = true; + streamIdx = endIdx - 1; + } + String systemName = sourceComponents[0]; - String streamName = sourceComponents[1]; + String streamName = sourceComponents[streamIdx]; + + return new SqlSystemSourceConfig(systemName, streamName, fetchSystemConfigs(systemName), isTable); + } - return new SqlSystemStreamConfig(systemName, streamName, fetchSystemConfigs(systemName)); + @Override + public boolean isTable(String sourceName) { + String[] sourceComponents = sourceName.split("\\."); + return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD); } private Config fetchSystemConfigs(String systemName) { http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java index ac3fd31..c161a0d 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java @@ -20,7 +20,7 @@ package org.apache.samza.sql.interfaces; /** - * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemStreamConfig} corresponding to the source. + * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemSourceConfig} corresponding to the source. */ public interface SourceResolver { /** @@ -30,5 +30,15 @@ public interface SourceResolver { * @return * System stream config corresponding to the source. */ - SqlSystemStreamConfig fetchSourceInfo(String sourceName); + SqlSystemSourceConfig fetchSourceInfo(String sourceName); + + /** + * Returns if a given source is a table. Different source resolvers could have different notations in the source + * name for denoting a table. Eg: system.stream.$table + * @param sourceName + * source that needs to be checked if it is a table. + * @return + * true if the source is a table, else false. + */ + boolean isTable(String sourceName); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java new file mode 100644 index 0000000..02ec18a --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java @@ -0,0 +1,129 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.interfaces; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import org.apache.commons.lang.Validate; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.system.SystemStream; + + +/** + * Configs associated with a system source. Both streams and table sources are supported. + * For now, only local tables are supported. + */ +public class SqlSystemSourceConfig { + + public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName"; + public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName"; + + private final String systemName; + + private final String streamName; + + private final String samzaRelConverterName; + private final SystemStream systemStream; + + private final String source; + private String relSchemaProviderName; + + private Config config; + + private List<String> sourceParts; + + public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig) { + this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, false); + } + + public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig, boolean isTable) { + this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, isTable); + } + + public SqlSystemSourceConfig(String systemName, String streamName, List<String> sourceParts, + Config systemConfig, boolean isTable) { + + + HashMap<String, String> streamConfigs = new HashMap<>(systemConfig); + this.systemName = systemName; + this.streamName = streamName; + this.source = getSourceFromSourceParts(sourceParts); + this.sourceParts = sourceParts; + this.systemStream = new SystemStream(systemName, streamName); + + samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER); + Validate.notEmpty(samzaRelConverterName, + String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName)); + + relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER); + + // Removing the Samza SQL specific configs to get the remaining Samza configs. + streamConfigs.remove(CFG_SAMZA_REL_CONVERTER); + streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER); + + // Currently, only local table is supported. And it is assumed that all tables are local tables. + if (isTable) { + streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true"); + streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest"); + } + + config = new MapConfig(streamConfigs); + } + + public static String getSourceFromSourceParts(List<String> sourceParts) { + return Joiner.on(".").join(sourceParts); + } + + public List<String> getSourceParts() { + return sourceParts; + } + + public String getSystemName() { + return systemName; + } + + public String getStreamName() { + return streamName; + } + + public String getSamzaRelConverterName() { + return samzaRelConverterName; + } + + public String getRelSchemaProviderName() { + return relSchemaProviderName; + } + + public SystemStream getSystemStream() { + return systemStream; + } + + public Config getConfig() { + return config; + } + + public String getSource() { + return source; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java deleted file mode 100644 index d8965a4..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java +++ /dev/null @@ -1,117 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql.interfaces; - -import com.google.common.base.Joiner; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import org.apache.commons.lang.Validate; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.system.SystemStream; - - -/** - * Configs associated with a system stream. - */ -public class SqlSystemStreamConfig { - - public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName"; - public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName"; - - private final String systemName; - - private final String streamName; - - private final String samzaRelConverterName; - private final SystemStream systemStream; - - private final String source; - private String relSchemaProviderName; - - private Config config; - - private List<String> sourceParts; - - public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) { - this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig); - } - - public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts, - Config systemConfig) { - - - HashMap<String, String> streamConfigs = new HashMap<>(systemConfig); - this.systemName = systemName; - this.streamName = streamName; - this.source = getSourceFromSourceParts(sourceParts); - this.sourceParts = sourceParts; - this.systemStream = new SystemStream(systemName, streamName); - - samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER); - Validate.notEmpty(samzaRelConverterName, - String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName)); - - relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER); - - // Removing the Samza SQL specific configs to get the remaining Samza configs. - streamConfigs.remove(CFG_SAMZA_REL_CONVERTER); - streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER); - - config = new MapConfig(streamConfigs); - } - - public static String getSourceFromSourceParts(List<String> sourceParts) { - return Joiner.on(".").join(sourceParts); - } - - public List<String> getSourceParts() { - return sourceParts; - } - - public String getSystemName() { - return systemName; - } - - public String getStreamName() { - return streamName; - } - - public String getSamzaRelConverterName() { - return samzaRelConverterName; - } - - public String getRelSchemaProviderName() { - return relSchemaProviderName; - } - - public SystemStream getSystemStream() { - return systemStream; - } - - public Config getConfig() { - return config; - } - - public String getSource() { - return source; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java index 2b67f18..f21eccf 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java @@ -55,7 +55,7 @@ import org.apache.calcite.tools.Planner; import org.apache.samza.SamzaException; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.RelSchemaProvider; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.interfaces.UdfMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,11 +72,11 @@ public class QueryPlanner { // Mapping between the source to the RelSchemaProvider corresponding to the source. private final Map<String, RelSchemaProvider> relSchemaProviders; - // Mapping between the source to the SqlSystemStreamConfig corresponding to the source. - private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource; + // Mapping between the source to the SqlSystemSourceConfig corresponding to the source. + private final Map<String, SqlSystemSourceConfig> systemStreamConfigBySource; public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders, - Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) { + Map<String, SqlSystemSourceConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) { this.relSchemaProviders = relSchemaProviders; this.systemStreamConfigBySource = systemStreamConfigBySource; this.udfMetadata = udfMetadata; @@ -88,7 +88,7 @@ public class QueryPlanner { CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); SchemaPlus rootSchema = calciteConnection.getRootSchema(); - for (SqlSystemStreamConfig ssc : systemStreamConfigBySource.values()) { + for (SqlSystemSourceConfig ssc : systemStreamConfigBySource.values()) { SchemaPlus previousLevelSchema = rootSchema; List<String> sourceParts = ssc.getSourceParts(); RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource()); @@ -96,7 +96,7 @@ public class QueryPlanner { for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) { String sourcePart = sourceParts.get(sourcePartIndex); if (sourcePartIndex < sourceParts.size() - 1) { - SchemaPlus sourcePartSchema = rootSchema.getSubSchema(sourcePart); + SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart); if (sourcePartSchema == null) { sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema()); } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 227a0f1..aeb7f35 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -42,7 +42,7 @@ import org.apache.samza.sql.interfaces.SamzaRelConverter; import org.apache.samza.sql.interfaces.SamzaRelConverterFactory; import org.apache.samza.sql.interfaces.SourceResolver; import org.apache.samza.sql.interfaces.SourceResolverFactory; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.interfaces.UdfMetadata; import org.apache.samza.sql.interfaces.UdfResolver; import org.apache.samza.sql.testutil.JsonUtil; @@ -50,7 +50,6 @@ import org.apache.samza.sql.testutil.ReflectionUtils; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo; import org.apache.samza.sql.testutil.SqlFileParser; -import org.apache.samza.system.SystemStream; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,8 +85,8 @@ public class SamzaSqlApplicationConfig { private final Collection<UdfMetadata> udfMetadata; - private final Map<String, SqlSystemStreamConfig> inputSystemStreamConfigBySource; - private final Map<String, SqlSystemStreamConfig> outputSystemStreamConfigsBySource; + private final Map<String, SqlSystemSourceConfig> inputSystemStreamConfigBySource; + private final Map<String, SqlSystemSourceConfig> outputSystemStreamConfigsBySource; private final List<String> sql; @@ -109,7 +108,7 @@ public class SamzaSqlApplicationConfig { .flatMap(Collection::stream) .collect(Collectors.toMap(Function.identity(), sourceResolver::fetchSourceInfo)); - Set<SqlSystemStreamConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values()); + Set<SqlSystemSourceConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values()); outputSystemStreamConfigsBySource = queryInfo.stream() .map(QueryInfo::getOutputSource) @@ -117,13 +116,13 @@ public class SamzaSqlApplicationConfig { systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values()); relSchemaProvidersBySource = systemStreamConfigs.stream() - .collect(Collectors.toMap(SqlSystemStreamConfig::getSource, + .collect(Collectors.toMap(SqlSystemSourceConfig::getSource, x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig, CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, (o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c)))); samzaRelConvertersBySource = systemStreamConfigs.stream() - .collect(Collectors.toMap(SqlSystemStreamConfig::getSource, + .collect(Collectors.toMap(SqlSystemSourceConfig::getSource, x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig, CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(), relSchemaProvidersBySource.get(x.getSource()), c)))); @@ -226,11 +225,11 @@ public class SamzaSqlApplicationConfig { return udfMetadata; } - public Map<String, SqlSystemStreamConfig> getInputSystemStreamConfigBySource() { + public Map<String, SqlSystemSourceConfig> getInputSystemStreamConfigBySource() { return inputSystemStreamConfigBySource; } - public Map<String, SqlSystemStreamConfig> getOutputSystemStreamConfigsBySource() { + public Map<String, SqlSystemSourceConfig> getOutputSystemStreamConfigsBySource() { return outputSystemStreamConfigsBySource; } @@ -241,4 +240,8 @@ public class SamzaSqlApplicationConfig { public Map<String, RelSchemaProvider> getRelSchemaProviders() { return relSchemaProvidersBySource; } + + public SourceResolver getSourceResolver() { + return sourceResolver; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 83928e1..f54ca42 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -32,7 +32,7 @@ import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; import org.apache.samza.sql.interfaces.SourceResolver; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,13 +82,13 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner { for (SamzaSqlQueryParser.QueryInfo query : queryInfo) { // Populate stream to system mapping config for input and output system streams for (String inputSource : query.getInputSources()) { - SqlSystemStreamConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource); + SqlSystemSourceConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource); newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()), inputSystemStreamConfig.getSystemName()); newConfig.putAll(inputSystemStreamConfig.getConfig()); } - SqlSystemStreamConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource()); + SqlSystemSourceConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource()); newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()), outputSystemStreamConfig.getSystemName()); newConfig.putAll(outputSystemStreamConfig.getConfig()); http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java index dd5f3bc..faf903a 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java @@ -23,7 +23,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -102,14 +101,14 @@ public class SamzaSqlQueryParser { String outputSource; String selectQuery; - String inputSource; + ArrayList<String> inputSources; if (sqlNode instanceof SqlInsert) { SqlInsert sqlInsert = ((SqlInsert) sqlNode); outputSource = sqlInsert.getTargetTable().toString(); if (sqlInsert.getSource() instanceof SqlSelect) { SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource(); selectQuery = m.group(2); - inputSource = getInputFromSelectQuery(sqlSelect); + inputSources = getInputsFromSelectQuery(sqlSelect); } else { throw new SamzaException("Sql query is not of the expected format"); } @@ -117,7 +116,7 @@ public class SamzaSqlQueryParser { throw new SamzaException("Sql query is not of the expected format"); } - return new QueryInfo(selectQuery, Collections.singletonList(inputSource), outputSource); + return new QueryInfo(selectQuery, inputSources, outputSource); } private static Planner createPlanner() { @@ -147,17 +146,17 @@ public class SamzaSqlQueryParser { return Frameworks.getPlanner(frameworkConfig); } - private static String getInputFromSelectQuery(SqlSelect sqlSelect) { + private static ArrayList<String> getInputsFromSelectQuery(SqlSelect sqlSelect) { ArrayList<String> input = new ArrayList<>(); getInput(sqlSelect.getFrom(), input); - if (input.size() != 1) { + if (input.size() < 1) { throw new SamzaException("Unsupported query " + sqlSelect); } - return input.get(0); + return input; } - private static void getInput(SqlNode node, ArrayList<String> inputSource) { + private static void getInput(SqlNode node, ArrayList<String> inputSourceList) { if (node instanceof SqlJoin) { SqlJoin joinNode = (SqlJoin) node; ArrayList<String> inputsLeft = new ArrayList<>(); @@ -165,24 +164,20 @@ public class SamzaSqlQueryParser { getInput(joinNode.getLeft(), inputsLeft); getInput(joinNode.getRight(), inputsRight); - if (!inputsLeft.isEmpty() && !inputsRight.isEmpty()) { - throw new SamzaException("Joins on two entities are not supported yet"); - } - - inputSource.addAll(inputsLeft); - inputSource.addAll(inputsRight); + inputSourceList.addAll(inputsLeft); + inputSourceList.addAll(inputsRight); } else if (node instanceof SqlIdentifier) { - inputSource.add(node.toString()); + inputSourceList.add(node.toString()); } else if (node instanceof SqlBasicCall) { SqlBasicCall basicCall = ((SqlBasicCall) node); if (basicCall.getOperator() instanceof SqlAsOperator) { - getInput(basicCall.operand(0), inputSource); + getInput(basicCall.operand(0), inputSourceList); } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) { - inputSource.add(getInputFromSelectQuery(basicCall.operand(0))); + inputSourceList.addAll(getInputsFromSelectQuery(basicCall.operand(0))); return; } } else if (node instanceof SqlSelect) { - getInput(((SqlSelect) node).getFrom(), inputSource); + getInput(((SqlSelect) node).getFrom(), inputSourceList); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java index 686ac15..798f0b3 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java @@ -22,7 +22,10 @@ package org.apache.samza.sql.translator; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rex.RexNode; import org.apache.samza.operators.MessageStream; import org.apache.samza.sql.data.Expression; import org.apache.samza.sql.data.SamzaSqlRelMessage; @@ -34,16 +37,23 @@ import org.slf4j.LoggerFactory; * Translator to translate the LogicalFilter node in the relational graph to the corresponding StreamGraph * implementation */ -public class FilterTranslator { +class FilterTranslator { private static final Logger log = LoggerFactory.getLogger(FilterTranslator.class); - public void translate(final LogicalFilter filter, final TranslatorContext context) { + void translate(final LogicalFilter filter, final TranslatorContext context) { MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(filter.getInput().getId()); + MessageStream<SamzaSqlRelMessage> outputStream = translateFilter(inputStream, filter.getInputs(), + filter.getCondition(), context); + context.registerMessageStream(filter.getId(), outputStream); + } + + static MessageStream<SamzaSqlRelMessage> translateFilter(MessageStream<SamzaSqlRelMessage> inputStream, + List<RelNode> inputs, RexNode condition, final TranslatorContext context) { Expression expr = - context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition())); + context.getExpressionCompiler().compile(inputs, Collections.singletonList(condition)); - MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(message -> { + return inputStream.filter(message -> { Object[] result = new Object[1]; expr.execute(context.getExecutionContext(), context.getDataContext(), message.getFieldValues().toArray(), result); if (result.length > 0 && result[0] instanceof Boolean) { @@ -56,7 +66,5 @@ public class FilterTranslator { return false; } }); - - context.registerMessageStream(filter.getId(), outputStream); } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java new file mode 100644 index 0000000..70c1968 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -0,0 +1,279 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.util.LinkedList; +import java.util.List; +import org.apache.calcite.adapter.enumerable.EnumerableTableScan; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.commons.lang.Validate; +import org.apache.samza.SamzaException; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.interfaces.SourceResolver; +import org.apache.samza.storage.kv.RocksDbTableDescriptor; +import org.apache.samza.table.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*; + + +/** + * Translator to translate the LogicalJoin node in the relational graph to the corresponding StreamGraph + * implementation. + * Join is supported with the following caveats: + * 0. Only local tables are supported. Remote/composite tables are not yet supported. + * 1. Only stream-table joins are supported. No stream-stream joins. + * 2. Only Equi-joins are supported. No theta-joins. + * 3. Inner joins, Left and Right outer joins are supported. No cross joins, full outer joins or natural joins. + * 4. Join condition with a constant is not supported. + * 5. Compound join condition with only AND operator is supported. AND operator with a constant is not supported. No + * support for OR operator or any other operator in the join condition. + * 6. Join condition with UDFs is not supported. Eg: udf1(a.key) = udf2(b.key) is not supported. + * + * It is assumed that the stream denoted as 'table' is already partitioned by the key(s) specified in the join + * condition. We do not repartition the table as bootstrap semantic is not propagated to the intermediate streams. + * Please refer SAMZA-1613 for more details on this. But we always repartition the stream by the key(s) specified in + * the join condition. + */ +class JoinTranslator { + + private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class); + private int joinId; + private SourceResolver sourceResolver; + + JoinTranslator(int joinId, SourceResolver sourceResolver) { + this.joinId = joinId; + this.sourceResolver = sourceResolver; + } + + void translate(final LogicalJoin join, final TranslatorContext context) { + + // Do the validation of join query + validateJoinQuery(join); + + boolean isTablePosOnRight = isTable(join.getRight()); + List<Integer> streamKeyIds = new LinkedList<>(); + List<Integer> tableKeyIds = new LinkedList<>(); + + // Fetch the stream and table indices corresponding to the fields given in the join condition. + populateStreamAndTableKeyIds(((RexCall) join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds, + tableKeyIds); + + JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); + JsonSerdeV2<SamzaSqlRelMessage> relMsgSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); + + Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, relMsgSerde, join, context); + + MessageStream<SamzaSqlRelMessage> inputStream = + isTablePosOnRight ? + context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId()); + + List<String> streamFieldNames = (isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames(); + List<String> tableFieldNames = (isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames(); + Validate.isTrue(streamKeyIds.size() == tableKeyIds.size()); + log.info("Joining on the following Stream and Table field(s): "); + for (int i = 0; i < streamKeyIds.size(); i++) { + log.info(streamFieldNames.get(streamKeyIds.get(i)) + " with " + tableFieldNames.get(tableKeyIds.get(i))); + } + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames, + tableFieldNames); + + // Always re-partition the messages from the input stream by the composite key and then join the messages + // with the table. + MessageStream<SamzaSqlRelMessage> outputStream = + inputStream + .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds), + m -> m, + KVSerde.of(keySerde, relMsgSerde), + "stream_" + joinId) + .map(KV::getValue) + .join(table, joinFn); + + context.registerMessageStream(join.getId(), outputStream); + } + + private void validateJoinQuery(LogicalJoin join) { + JoinRelType joinRelType = join.getJoinType(); + + if (joinRelType.compareTo(JoinRelType.INNER) != 0 && joinRelType.compareTo(JoinRelType.LEFT) != 0 + && joinRelType.compareTo(JoinRelType.RIGHT) != 0) { + throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported."); + } + + boolean isTablePosOnLeft = isTable(join.getLeft()); + boolean isTablePosOnRight = isTable(join.getRight()); + + if (!isTablePosOnLeft && !isTablePosOnRight) { + throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. " + + "Stream-stream join is not yet supported. " + dumpRelPlanForNode(join)); + } + + if (isTablePosOnLeft && isTablePosOnRight) { + throw new SamzaException("Invalid query with both sides of join being denoted as 'table'. " + + dumpRelPlanForNode(join)); + } + + if (joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnLeft && !isTablePosOnRight) { + throw new SamzaException("Invalid query for outer left join. Left side of the join should be a 'stream' and " + + "right side of join should be a 'table'. " + dumpRelPlanForNode(join)); + } + + if (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && isTablePosOnRight && !isTablePosOnLeft) { + throw new SamzaException("Invalid query for outer right join. Left side of the join should be a 'table' and " + + "right side of join should be a 'stream'. " + dumpRelPlanForNode(join)); + } + + validateJoinCondition(join.getCondition()); + } + + private void validateJoinCondition(RexNode operand) { + if (!(operand instanceof RexCall)) { + throw new SamzaException("SQL Query is not supported. Join condition operand " + operand + + " is of type " + operand.getClass()); + } + + RexCall condition = (RexCall) operand; + + if (condition.isAlwaysTrue()) { + throw new SamzaException("Query results in a cross join, which is not supported. Please optimize the query." + + " It is expected that the joins should include JOIN ON operator in the sql query."); + } + + if (condition.getKind() != SqlKind.EQUALS && condition.getKind() != SqlKind.AND) { + throw new SamzaException("Only equi-joins and AND operator is supported in join condition."); + } + } + + // Fetch the stream and table key indices corresponding to the fields given in the join condition by parsing through + // the condition. Stream and table key indices are populated in streamKeyIds and tableKeyIds respectively. + private void populateStreamAndTableKeyIds(List<RexNode> operands, final LogicalJoin join, boolean isTablePosOnRight, + List<Integer> streamKeyIds, List<Integer> tableKeyIds) { + + // All non-leaf operands in the join condition should be expressions. + if (operands.get(0) instanceof RexCall) { + operands.forEach(operand -> { + validateJoinCondition(operand); + populateStreamAndTableKeyIds(((RexCall) operand).getOperands(), join, isTablePosOnRight, streamKeyIds, tableKeyIds); + }); + return; + } + + // We are at the leaf of the join condition. Only binary operators are supported. + Validate.isTrue(operands.size() == 2); + + // Only reference operands are supported in row expressions and not constants. + // a.key = b.key is supported with a.key and b.key being reference operands. + // a.key = "constant" is not yet supported. + if (!(operands.get(0) instanceof RexInputRef) || !(operands.get(1) instanceof RexInputRef)) { + throw new SamzaException("SQL query is not supported. Join condition " + join.getCondition() + " should have " + + "reference operands but the types are " + operands.get(0).getClass() + " and " + operands.get(1).getClass()); + } + + // Join condition is commutative, meaning, a.key = b.key is equivalent to b.key = a.key. + // Calcite assigns the indices to the fields based on the order a and b are specified in + // the sql 'from' clause. Let's put the operand with smaller index in leftRef and larger + // index in rightRef so that the order of operands in the join condition is in the order + // the stream and table are specified in the 'from' clause. + RexInputRef leftRef = (RexInputRef) operands.get(0); + RexInputRef rightRef = (RexInputRef) operands.get(1); + + // Let's validate the key used in the join condition. + validateKey(leftRef); + validateKey(rightRef); + + if (leftRef.getIndex() > rightRef.getIndex()) { + RexInputRef tmpRef = leftRef; + leftRef = rightRef; + rightRef = tmpRef; + } + + // Get the table key index and stream key index + int deltaKeyIdx = rightRef.getIndex() - join.getLeft().getRowType().getFieldCount(); + streamKeyIds.add(isTablePosOnRight ? leftRef.getIndex() : deltaKeyIdx); + tableKeyIds.add(isTablePosOnRight ? deltaKeyIdx : leftRef.getIndex()); + } + + private void validateKey(RexInputRef ref) { + SqlTypeName sqlTypeName = ref.getType().getSqlTypeName(); + // Only primitive types are supported in the key + if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT + && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT + && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT) { + log.error("Unsupported key type " + sqlTypeName + " used in join condition."); + throw new SamzaException("Unsupported key type used in join condition."); + } + } + + private String dumpRelPlanForNode(RelNode relNode) { + return RelOptUtil.dumpPlan("Rel expression: ", + relNode, SqlExplainFormat.TEXT, + SqlExplainLevel.EXPPLAN_ATTRIBUTES); + } + + private boolean isTable(RelNode relNode) { + // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of + // stream-table-table join, the left side of the join is join output, which we always + // assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan. + return relNode instanceof EnumerableTableScan && + sourceResolver.isTable(String.join(".", relNode.getTable().getQualifiedName())); + } + + private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, Serde keySerde, Serde relMsgSerde, + LogicalJoin join, TranslatorContext context) { + MessageStream<SamzaSqlRelMessage> inputTable = + isTablePosOnRight ? + context.getMessageStream(join.getRight().getId()) : context.getMessageStream(join.getLeft().getId()); + + // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational + // message as the value. Send the messages from the input stream denoted as 'table' to the created table store. + Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table = + context.getStreamGraph() + .getTable(new RocksDbTableDescriptor("table_" + joinId) + .withSerde(KVSerde.of(keySerde, relMsgSerde))); + + inputTable + .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m)) + .sendTo(table); + + return table; + } + + private void logStringAndTableJoinKeys(List<String> fieldNames, List<Integer> fieldIds) { + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java index f5cc525..0f31fb6 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java @@ -42,11 +42,11 @@ import org.slf4j.LoggerFactory; * Translator to translate the Project node in the relational graph to the corresponding StreamGraph * implementation. */ -public class ProjectTranslator { +class ProjectTranslator { private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class); - public void translate(final Project project, final TranslatorContext context) { + void translate(final Project project, final TranslatorContext context) { MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId()); List<Integer> flattenProjects = project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index 87e37f4..b853537 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.RelShuttleImpl; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; @@ -32,7 +33,8 @@ import org.apache.samza.operators.StreamGraph; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SourceResolver; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.planner.QueryPlanner; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; @@ -62,6 +64,7 @@ public class QueryTranslator { final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery()); final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext); final RelNode node = relRoot.project(); + final int[] joinId = new int[1]; node.accept(new RelShuttleImpl() { @Override @@ -84,9 +87,18 @@ public class QueryTranslator { new ProjectTranslator().translate(project, context); return node; } + + @Override + public RelNode visit(LogicalJoin join) { + RelNode node = super.visit(join); + joinId[0]++; + SourceResolver sourceResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getSourceResolver(); + new JoinTranslator(joinId[0], sourceResolver).translate(join, context); + return node; + } }); - SqlSystemStreamConfig outputSystemConfig = + SqlSystemSourceConfig outputSystemConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource()); SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource()); MessageStreamImpl<SamzaSqlRelMessage> stream = http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java new file mode 100644 index 0000000..69e4e09 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java @@ -0,0 +1,121 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.lang.Validate; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*; + + +/** + * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key + * being {@link SamzaSqlCompositeKey} + */ +public class SamzaSqlRelMessageJoinFunction + implements StreamTableJoinFunction<SamzaSqlCompositeKey, SamzaSqlRelMessage, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>, SamzaSqlRelMessage> { + + private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class); + + private final JoinRelType joinRelType; + private final boolean isTablePosOnRight; + private final List<Integer> streamFieldIds; + // Table field names are used in the outer join when the table record is not found. + private final List<String> tableFieldNames; + private final List<String> outFieldNames; + + public SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight, + List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) { + this.joinRelType = joinRelType; + this.isTablePosOnRight = isTablePosOnRight; + Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) || + (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) || + joinRelType.compareTo(JoinRelType.INNER) == 0); + this.streamFieldIds = streamFieldIds; + this.tableFieldNames = tableFieldNames; + this.outFieldNames = new ArrayList<>(); + if (isTablePosOnRight) { + outFieldNames.addAll(streamFieldNames); + } + outFieldNames.addAll(tableFieldNames); + if (!isTablePosOnRight) { + outFieldNames.addAll(streamFieldNames); + } + } + + @Override + public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) { + + if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) { + log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message)); + // Returning null would result in Join operator implementation to filter out the message. + return null; + } + + // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and + // table record. The order of stream message fields and table record fields are dictated by the position of stream + // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message + // and the table record. + List<Object> outFieldValues = new ArrayList<>(); + + // If table position is on the right, add the stream message fields first + if (isTablePosOnRight) { + outFieldValues.addAll(message.getFieldValues()); + } + + // Add the table record fields. + if (record != null) { + outFieldValues.addAll(record.getValue().getFieldValues()); + } else { + // Table record could be null as the record could not be found in the store. This can + // happen for outer joins. Add nulls to all the field values in the output message. + tableFieldNames.forEach(s -> outFieldValues.add(null)); + } + + // If table position is on the left, add the stream message fields last + if (!isTablePosOnRight) { + outFieldValues.addAll(message.getFieldValues()); + } + + return new SamzaSqlRelMessage(outFieldNames, outFieldValues); + } + + @Override + public SamzaSqlCompositeKey getMessageKey(SamzaSqlRelMessage message) { + return createSamzaSqlCompositeKey(message, streamFieldIds); + } + + @Override + public SamzaSqlCompositeKey getRecordKey(KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) { + return record.getKey(); + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 30e5a9b..13300f7 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -28,27 +28,27 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; /** * Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph * implementation */ -public class ScanTranslator { +class ScanTranslator { private final Map<String, SamzaRelConverter> relMsgConverters; - private final Map<String, SqlSystemStreamConfig> systemStreamConfig; + private final Map<String, SqlSystemSourceConfig> systemStreamConfig; - public ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemStreamConfig> ssc) { + ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemSourceConfig> ssc) { relMsgConverters = converters; this.systemStreamConfig = ssc; } - public void translate(final TableScan tableScan, final TranslatorContext context) { + void translate(final TableScan tableScan, final TranslatorContext context) { StreamGraph streamGraph = context.getStreamGraph(); List<String> tableNameParts = tableScan.getTable().getQualifiedName(); - String sourceName = SqlSystemStreamConfig.getSourceFromSourceParts(tableNameParts); + String sourceName = SqlSystemSourceConfig.getSourceFromSourceParts(tableNameParts); Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName)); SamzaRelConverter converter = relMsgConverters.get(sourceName);
