Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input
This PR has the following changes: - Let QueryTranslator take Calcite IR as input - Include 'INSERT INTO' sql statement for Calcite plan - Basic DSLConverter Framework with SamzaSQL dialect as an example - Some fixes to stream-table join wrt Serde Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu <[email protected]>, Weiqing <[email protected]> Closes #630 from atoomula/dsl3 and squashes the following commits: 93c66cee [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 21c0175b [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 15a1e9fb [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 5bf0c7e1 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 98cd9777 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 63a66fb1 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 6794b512 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. c9d434a9 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 94e53b64 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. 30c76ed9 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dec16392 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dec16392 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dec16392 Branch: refs/heads/NewKafkaSystemConsumer Commit: dec16392de2f5d323b6b1b3acf8de1689038f44d Parents: db6996e Author: Aditya Toomula <[email protected]> Authored: Thu Sep 20 14:22:38 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Thu Sep 20 14:22:38 2018 -0700 ---------------------------------------------------------------------- .../samza/sql/data/RexToJavaCompiler.java | 5 +- .../samza/sql/dsl/SamzaSqlDslConverter.java | 96 ++++++ .../sql/dsl/SamzaSqlDslConverterFactory.java | 33 ++ .../samza/sql/interfaces/DslConverter.java | 37 ++ .../sql/interfaces/DslConverterFactory.java | 36 ++ .../samza/sql/interfaces/SamzaSqlDriver.java | 56 +++ .../interfaces/SamzaSqlJavaTypeFactoryImpl.java | 72 ++++ .../samza/sql/runner/SamzaSqlApplication.java | 30 +- .../sql/runner/SamzaSqlApplicationConfig.java | 117 ++++--- .../sql/runner/SamzaSqlApplicationRunner.java | 41 ++- .../samza/sql/testutil/SamzaSqlQueryParser.java | 21 +- .../samza/sql/translator/JoinTranslator.java | 1 + .../samza/sql/translator/ModifyTranslator.java | 117 +++++++ .../samza/sql/translator/QueryTranslator.java | 90 ++--- .../samza/sql/translator/ScanTranslator.java | 10 +- .../apache/samza/sql/e2e/TestSamzaSqlTable.java | 4 +- .../runner/TestSamzaSqlApplicationConfig.java | 49 ++- .../runner/TestSamzaSqlApplicationRunner.java | 2 +- .../samza/sql/system/TestAvroSystemFactory.java | 3 +- .../samza/sql/testutil/SamzaSqlTestConfig.java | 3 + .../sql/testutil/TestSamzaSqlFileParser.java | 1 + .../sql/translator/TestQueryTranslator.java | 345 +++++++++++++------ .../test/samzasql/TestSamzaSqlEndToEnd.java | 64 +++- 23 files changed, 948 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java index 21c81a8..1cfa95f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java @@ -49,6 +49,7 @@ import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; import org.apache.calcite.util.Pair; import org.apache.samza.SamzaException; +import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl; import org.codehaus.commons.compiler.CompileException; import org.codehaus.commons.compiler.CompilerFactoryFactory; import org.codehaus.commons.compiler.IClassBodyEvaluator; @@ -114,11 +115,11 @@ public class RexToJavaCompiler { final ParameterExpression root = DataContext.ROOT; final ParameterExpression inputValues = Expressions.parameter(Object[].class, "inputValues"); final ParameterExpression outputValues = Expressions.parameter(Object[].class, "outputValues"); - final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); + final JavaTypeFactoryImpl javaTypeFactory = new SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); // public void execute(Object[] inputValues, Object[] outputValues) final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList.of( - Pair.<org.apache.calcite.linq4j.tree.Expression, PhysType>of( + Pair.of( Expressions.variable(Object[].class, "inputValues"), PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false)))); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java new file mode 100644 index 0000000..4ec6f4a --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java @@ -0,0 +1,96 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.dsl; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelRoot; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.sql.interfaces.DslConverter; +import org.apache.samza.sql.planner.QueryPlanner; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.testutil.SamzaSqlQueryParser; +import org.apache.samza.sql.testutil.SqlFileParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*; + + +public class SamzaSqlDslConverter implements DslConverter { + + private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlDslConverter.class); + + private final Config config; + + SamzaSqlDslConverter(Config config) { + this.config = config; + } + + @Override + public Collection<RelRoot> convertDsl(String dsl) { + // TODO: Introduce an API to parse a dsl string and return one or more sql statements + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config, + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + + QueryPlanner planner = + new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(), + sqlConfig.getUdfMetadata()); + + List<RelRoot> relRoots = new LinkedList<>(); + for (String sql: sqlStmts) { + relRoots.add(planner.plan(sql)); + } + return relRoots; + } + + public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) { + return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList()); + } + + public static List<String> fetchSqlFromConfig(Map<String, String> config) { + List<String> sql; + if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) { + String sqlValue = config.get(CFG_SQL_STMT); + sql = Collections.singletonList(sqlValue); + } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) { + sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON)); + } else if (config.containsKey(CFG_SQL_FILE)) { + String sqlFile = config.get(CFG_SQL_FILE); + sql = SqlFileParser.parseSqlFile(sqlFile); + } else { + String msg = "Config doesn't contain the SQL that needs to be executed."; + LOG.error(msg); + throw new SamzaException(msg); + } + + return sql; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java new file mode 100644 index 0000000..5176453 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.dsl; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.interfaces.DslConverter; +import org.apache.samza.sql.interfaces.DslConverterFactory; + + +public class SamzaSqlDslConverterFactory implements DslConverterFactory { + + @Override + public DslConverter create(Config config) { + return new SamzaSqlDslConverter(config); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java new file mode 100644 index 0000000..fc2ca8e --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.interfaces; + +import java.util.Collection; +import org.apache.calcite.rel.RelRoot; + + +/** + * Samza SQL Application uses {@link DslConverter} to convert the input dsl to Calcite logical plan. + */ +public interface DslConverter { + + /** + * Convert the dsl into the Calcite logical plan. + * @return List of Root nodes of the Calcite logical plan. + * If DSL represents multiple SQL statements. You might return root nodes one for each SQL statement. + */ + Collection<RelRoot> convertDsl(String dsl); +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java new file mode 100644 index 0000000..d42a96f --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java @@ -0,0 +1,36 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.interfaces; + +import org.apache.samza.config.Config; + + +/** + * Factory that is used to create {@link DslConverter} + */ +public interface DslConverterFactory { + + /** + * Create a {@link DslConverter} given the config + * @param config config needed to create the {@link DslConverter} + * @return {@link DslConverter} object created. + */ + DslConverter create(Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java new file mode 100644 index 0000000..5c86df9 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samza.sql.interfaces; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.ConnectStringParser; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.jdbc.CalciteFactory; +import org.apache.calcite.jdbc.Driver; + + +/** + * Calcite JDBC driver for SamzaSQL which takes in a {@link JavaTypeFactory} + */ +public class SamzaSqlDriver extends Driver { + + private JavaTypeFactory typeFactory; + + public SamzaSqlDriver(JavaTypeFactory typeFactory) { + this.typeFactory = typeFactory; + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + if (!acceptsURL(url)) { + return null; + } + final String prefix = getConnectStringPrefix(); + assert url.startsWith(prefix); + final String urlSuffix = url.substring(prefix.length()); + final Properties info2 = ConnectStringParser.parse(urlSuffix, info); + final AvaticaConnection connection = + ((CalciteFactory) factory).newConnection(this, factory, url, info2, null, typeFactory); + handler.onConnectionInit(connection); + return connection; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java new file mode 100644 index 0000000..50001c6 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samza.sql.interfaces; + +import com.google.common.collect.Lists; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules; +import org.apache.calcite.sql.type.SqlTypeName; + + +/** + * Calcite does validation of projected field types in select statement with the output schema types. If one of the + * projected fields is an UDF with return type of {@link Object} or any other java type not defined in + * {@link JavaToSqlTypeConversionRules}, using the default {@link JavaTypeFactoryImpl} results in validation failure. + * Hence, extending {@link JavaTypeFactoryImpl} to make Calcite validation work with all output types of Samza SQL UDFs. + */ +public class SamzaSqlJavaTypeFactoryImpl + extends JavaTypeFactoryImpl { + + public SamzaSqlJavaTypeFactoryImpl() { + this(RelDataTypeSystem.DEFAULT); + } + + public SamzaSqlJavaTypeFactoryImpl(RelDataTypeSystem typeSystem) { + super(typeSystem); + } + + @Override + public RelDataType toSql(RelDataType type) { + return toSql(this, type); + } + + /** Converts a type in Java format to a SQL-oriented type. */ + public static RelDataType toSql(final RelDataTypeFactory typeFactory, + RelDataType type) { + if (type instanceof RelRecordType) { + return typeFactory.createStructType( + Lists.transform(type.getFieldList(), a0 -> toSql(typeFactory, a0.getType())), + type.getFieldNames()); + } + if (type instanceof JavaType) { + SqlTypeName typeName = JavaToSqlTypeConversionRules.instance().lookup(((JavaType) type).getJavaClass()); + // For unknown sql type names, return ANY sql type to make Calcite validation not fail. + if (typeName == null) { + typeName = SqlTypeName.ANY; + } + return typeFactory.createTypeWithNullability( + typeFactory.createSqlType(typeName), + type.isNullable()); + } else { + return JavaTypeFactoryImpl.toSql(typeFactory, type); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java index 9a871d7..fd1a2a8 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java @@ -19,10 +19,14 @@ package org.apache.samza.sql.runner; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.calcite.rel.RelRoot; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser; +import org.apache.samza.sql.dsl.SamzaSqlDslConverter; import org.apache.samza.sql.translator.QueryTranslator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +42,26 @@ public class SamzaSqlApplication implements StreamApplication { @Override public void describe(StreamApplicationDescriptor appDesc) { try { - SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(appDesc.getConfig()); + // TODO: Introduce an API to return a dsl string containing one or more sql statements. + List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDesc.getConfig()); + + // 1. Get Calcite plan + Set<String> inputSystemStreams = new HashSet<>(); + Set<String> outputSystemStreams = new HashSet<>(); + + Collection<RelRoot> relRoots = + SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDesc.getConfig(), + inputSystemStreams, outputSystemStreams); + + // 2. Populate configs + SamzaSqlApplicationConfig sqlConfig = + new SamzaSqlApplicationConfig(appDesc.getConfig(), inputSystemStreams, outputSystemStreams); + + // 3. Translate Calcite plan to Samza stream operators QueryTranslator queryTranslator = new QueryTranslator(sqlConfig); - List<SamzaSqlQueryParser.QueryInfo> queries = sqlConfig.getQueryInfo(); - for (SamzaSqlQueryParser.QueryInfo query : queries) { - LOG.info("Translating the query {} to samza stream graph", query.getSelectQuery()); - queryTranslator.translate(query, appDesc); + for (RelRoot relRoot : relRoots) { + LOG.info("Translating relRoot {} to samza stream graph", relRoot); + queryTranslator.translate(relRoot, appDesc); } } catch (RuntimeException e) { LOG.error("SamzaSqlApplication threw exception.", e); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 997312f..415cfdd 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -20,7 +20,6 @@ package org.apache.samza.sql.runner; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,12 +29,18 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.core.TableModify; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; -import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; +import org.apache.samza.sql.interfaces.DslConverter; +import org.apache.samza.sql.interfaces.DslConverterFactory; import org.apache.samza.sql.interfaces.RelSchemaProvider; import org.apache.samza.sql.interfaces.RelSchemaProviderFactory; import org.apache.samza.sql.interfaces.SamzaRelConverter; @@ -47,9 +52,6 @@ import org.apache.samza.sql.interfaces.UdfMetadata; import org.apache.samza.sql.interfaces.UdfResolver; import org.apache.samza.sql.testutil.JsonUtil; import org.apache.samza.sql.testutil.ReflectionUtils; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo; -import org.apache.samza.sql.testutil.SqlFileParser; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,37 +94,25 @@ public class SamzaSqlApplicationConfig { private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource; private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource; - - private final List<String> sql; - - private final List<QueryInfo> queryInfo; + private final Map<String, SqlIOConfig> systemStreamConfigsBySource; private final long windowDurationMs; - public SamzaSqlApplicationConfig(Config staticConfig) { - - sql = fetchSqlFromConfig(staticConfig); - - queryInfo = fetchQueryInfo(sql); + public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams, + Set<String> outputSystemStreams) { ioResolver = createIOResolver(staticConfig); - udfResolver = createUdfResolver(staticConfig); - udfMetadata = udfResolver.getUdfs(); + inputSystemStreamConfigBySource = inputSystemStreams.stream() + .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src))); - inputSystemStreamConfigBySource = queryInfo.stream() - .map(QueryInfo::getSources) - .flatMap(Collection::stream) - .distinct() - .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo)); + outputSystemStreamConfigsBySource = outputSystemStreams.stream() + .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x))); - Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values()); + systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource); + systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource); - outputSystemStreamConfigsBySource = queryInfo.stream() - .map(QueryInfo::getSink) - .distinct() - .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo)); - systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values()); + Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(systemStreamConfigsBySource.values()); relSchemaProvidersBySource = systemStreamConfigs.stream() .collect(Collectors.toMap(SqlIOConfig::getSource, @@ -136,6 +126,9 @@ public class SamzaSqlApplicationConfig { CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(), relSchemaProvidersBySource.get(x.getSource()), c)))); + udfResolver = createUdfResolver(staticConfig); + udfMetadata = udfResolver.getUdfs(); + windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS); } @@ -151,30 +144,7 @@ public class SamzaSqlApplicationConfig { return factoryInvoker.apply(factory, pluginConfig); } - public static List<QueryInfo> fetchQueryInfo(List<String> sqlStmts) { - return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList()); - } - - public static List<String> fetchSqlFromConfig(Map<String, String> config) { - List<String> sql; - if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) { - String sqlValue = config.get(CFG_SQL_STMT); - sql = Collections.singletonList(sqlValue); - } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) { - sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON)); - } else if (config.containsKey(CFG_SQL_FILE)) { - String sqlFile = config.get(CFG_SQL_FILE); - sql = SqlFileParser.parseSqlFile(sqlFile); - } else { - String msg = "Config doesn't contain the SQL that needs to be executed."; - LOG.error(msg); - throw new SamzaException(msg); - } - - return sql; - } - - private static List<String> deserializeSqlStmts(String value) { + public static List<String> deserializeSqlStmts(String value) { Validate.notEmpty(value, "json Value is not set or empty"); return JsonUtil.fromJson(value, new TypeReference<List<String>>() { }); @@ -224,12 +194,45 @@ public class SamzaSqlApplicationConfig { return ret; } - public List<String> getSql() { - return sql; + public static Collection<RelRoot> populateSystemStreamsAndGetRelRoots(List<String> dslStmts, Config config, + Set<String> inputSystemStreams, Set<String> outputSystemStreams) { + // TODO: Get the converter factory based on the file type. Create abstraction around this. + DslConverterFactory dslConverterFactory = new SamzaSqlDslConverterFactory(); + DslConverter dslConverter = dslConverterFactory.create(config); + + Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts)); + + for (RelRoot relRoot : relRoots) { + SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams); + } + + return relRoots; + } + + private static void populateSystemStreams(RelNode relNode, Set<String> inputSystemStreams, + Set<String> outputSystemStreams) { + if (relNode instanceof TableModify) { + outputSystemStreams.add(getSystemStreamName(relNode)); + } else { + if (relNode instanceof BiRel) { + BiRel biRelNode = (BiRel) relNode; + populateSystemStreams(biRelNode.getLeft(), inputSystemStreams, outputSystemStreams); + populateSystemStreams(biRelNode.getRight(), inputSystemStreams, outputSystemStreams); + } else { + if (relNode.getTable() != null) { + inputSystemStreams.add(getSystemStreamName(relNode)); + } + } + } + List<RelNode> relNodes = relNode.getInputs(); + if (relNodes == null || relNodes.isEmpty()) { + return; + } + relNodes.forEach(node -> populateSystemStreams(node, inputSystemStreams, outputSystemStreams)); } - public List<QueryInfo> getQueryInfo() { - return queryInfo; + private static String getSystemStreamName(RelNode relNode) { + return relNode.getTable().getQualifiedName().stream().map(Object::toString).collect(Collectors.joining(".")); } public Collection<UdfMetadata> getUdfMetadata() { @@ -244,6 +247,10 @@ public class SamzaSqlApplicationConfig { return outputSystemStreamConfigsBySource; } + public Map<String, SqlIOConfig> getSystemStreamConfigsBySource() { + return systemStreamConfigsBySource; + } + public Map<String, SamzaRelConverter> getSamzaRelConverters() { return samzaRelConvertersBySource; } http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 027fd23..cad032f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -21,20 +21,21 @@ package org.apache.samza.sql.runner; import java.time.Duration; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.Validate; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.job.ApplicationStatus; -import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; +import org.apache.samza.sql.dsl.SamzaSqlDslConverter; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.interfaces.SqlIOResolver; -import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,25 +64,31 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner { public static Config computeSamzaConfigs(Boolean localRunner, Config config) { Map<String, String> newConfig = new HashMap<>(); - SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config); - // Parse the sql and find the input stream streams - List<String> sqlStmts = SamzaSqlApplicationConfig.fetchSqlFromConfig(config); + // TODO: Introduce an API to return a dsl string containing one or more sql statements + List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config); // This is needed because the SQL file may not be available in all the node managers. - String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(sqlStmts); + String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(dslStmts); newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson); - List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlApplicationConfig.fetchQueryInfo(sqlStmts); - for (SamzaSqlQueryParser.QueryInfo query : queryInfo) { - // Populate stream to system mapping config for input and output system streams - for (String inputSource : query.getSources()) { - SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(inputSource); - newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()), - inputSystemStreamConfig.getSystemName()); - newConfig.putAll(inputSystemStreamConfig.getConfig()); - } - - SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(query.getSink()); + Set<String> inputSystemStreams = new HashSet<>(); + Set<String> outputSystemStreams = new HashSet<>(); + + SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, config, + inputSystemStreams, outputSystemStreams); + + SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config); + + // Populate stream to system mapping config for input and output system streams + for (String source : inputSystemStreams) { + SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(source); + newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()), + inputSystemStreamConfig.getSystemName()); + newConfig.putAll(inputSystemStreamConfig.getConfig()); + } + + for (String sink : outputSystemStreams) { + SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(sink); newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()), outputSystemStreamConfig.getSystemName()); newConfig.putAll(outputSystemStreamConfig.getConfig()); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java index 39ea092..643c82f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java @@ -24,9 +24,11 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.plan.Contexts; @@ -49,6 +51,8 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Planner; import org.apache.samza.SamzaException; +import org.apache.samza.sql.interfaces.SamzaSqlDriver; +import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl; /** @@ -63,11 +67,13 @@ public class SamzaSqlQueryParser { private final List<String> sources; private String selectQuery; private String sink; + private String sql; - public QueryInfo(String selectQuery, List<String> sources, String sink) { + public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) { this.selectQuery = selectQuery; this.sink = sink; this.sources = sources; + this.sql = sql; } public List<String> getSources() { @@ -81,6 +87,10 @@ public class SamzaSqlQueryParser { public String getSink() { return sink; } + + public String getSql() { + return sql; + } } public static QueryInfo parseQuery(String sql) { @@ -116,14 +126,18 @@ public class SamzaSqlQueryParser { throw new SamzaException("Sql query is not of the expected format"); } - return new QueryInfo(selectQuery, sources, sink); + return new QueryInfo(selectQuery, sources, sink, sql); } private static Planner createPlanner() { Connection connection; SchemaPlus rootSchema; try { - connection = DriverManager.getConnection("jdbc:calcite:"); + JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl(); + SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory); + DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:")); + DriverManager.registerDriver(driver); + connection = driver.connect("jdbc:calcite:", new Properties()); CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); rootSchema = calciteConnection.getRootSchema(); } catch (SQLException e) { @@ -174,7 +188,6 @@ public class SamzaSqlQueryParser { getSource(basicCall.operand(0), sourceList); } else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) { sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0))); - return; } } else if (node instanceof SqlSelect) { getSource(((SqlSelect) node).getFrom(), sourceList); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 7071b39..ac2c64d 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -127,6 +127,7 @@ class JoinTranslator { "stream_" + joinId) .map(KV::getValue) .join(table, joinFn); + // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn); context.registerMessageStream(join.getId(), outputStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java new file mode 100644 index 0000000..965338f --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java @@ -0,0 +1,117 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.translator; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.calcite.rel.core.TableModify; +import org.apache.commons.lang.Validate; +import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.interfaces.SamzaRelConverter; +import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.table.Table; +import org.apache.samza.task.TaskContext; + + +/** + * Translator to translate the TableModify in relational graph to the corresponding output streams in the StreamGraph + * implementation + */ +class ModifyTranslator { + + private final Map<String, SamzaRelConverter> relMsgConverters; + private final Map<String, SqlIOConfig> systemStreamConfig; + + ModifyTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc) { + relMsgConverters = converters; + this.systemStreamConfig = ssc; + } + + // OutputMapFunction converts SamzaSqlRelMessage to SamzaMessage in KV format + private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> { + // All the user-supplied functions are expected to be serializable in order to enable full serialization of user + // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator + // initialization. + private transient SamzaRelConverter samzaMsgConverter; + private final String outputTopic; + + OutputMapFunction(String outputTopic) { + this.outputTopic = outputTopic; + } + + @Override + public void init(Config config, TaskContext taskContext) { + TranslatorContext context = (TranslatorContext) taskContext.getUserContext(); + this.samzaMsgConverter = context.getMsgConverter(outputTopic); + } + + @Override + public KV<Object, Object> apply(SamzaSqlRelMessage message) { + return this.samzaMsgConverter.convertToSamzaMessage(message); + } + } + + void translate(final TableModify tableModify, final TranslatorContext context) { + StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor(); + List<String> tableNameParts = tableModify.getTable().getQualifiedName(); + String targetName = SqlIOConfig.getSourceFromSourceParts(tableNameParts); + + Validate.isTrue(relMsgConverters.containsKey(targetName), String.format("Unknown source %s", targetName)); + + SqlIOConfig sinkConfig = systemStreamConfig.get(targetName); + + final String systemName = sinkConfig.getSystemName(); + final String streamName = sinkConfig.getStreamName(); + + KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); + DelegatingSystemDescriptor + sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); + GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamName, noOpKVSerde); + + MessageStreamImpl<SamzaSqlRelMessage> stream = + (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId()); + MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(targetName)); + + Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor(); + if (!tableDescriptor.isPresent()) { + outputStream.sendTo(streamAppDesc.getOutputStream(osd)); + } else { + Table outputTable = streamAppDesc.getTable(tableDescriptor.get()); + if (outputTable == null) { + String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource(); + throw new SamzaException(msg); + } + outputStream.sendTo(outputTable); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index fe4d8da..3a35b97 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -20,10 +20,10 @@ package org.apache.samza.sql.translator; import java.util.Map; -import java.util.Optional; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.core.TableModify; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; @@ -33,27 +33,13 @@ import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.TableDescriptor; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.descriptors.GenericOutputDescriptor;; -import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlExecutionContext; -import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.planner.QueryPlanner; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; -import org.apache.samza.table.Table; import org.apache.samza.task.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -62,54 +48,56 @@ import org.slf4j.LoggerFactory; * It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly. */ public class QueryTranslator { - private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class); - private final ScanTranslator scanTranslator; + private final ModifyTranslator modifyTranslator; private final SamzaSqlApplicationConfig sqlConfig; private final Map<String, SamzaRelConverter> converters; - private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> { - private transient SamzaRelConverter samzaMsgConverter; - private final String outputTopic; - - OutputMapFunction(String outputTopic) { - this.outputTopic = outputTopic; - } - - @Override - public void init(Config config, TaskContext taskContext) { - TranslatorContext context = (TranslatorContext) taskContext.getUserContext(); - this.samzaMsgConverter = context.getMsgConverter(outputTopic); - } - - @Override - public KV<Object, Object> apply(SamzaSqlRelMessage message) { - return this.samzaMsgConverter.convertToSamzaMessage(message); - } - } - public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) { this.sqlConfig = sqlConfig; scanTranslator = new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource()); + modifyTranslator = + new ModifyTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getOutputSystemStreamConfigsBySource()); this.converters = sqlConfig.getSamzaRelConverters(); } public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) { QueryPlanner planner = - new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(), + new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(), sqlConfig.getUdfMetadata()); + final RelRoot relRoot = planner.plan(queryInfo.getSql()); + translate(relRoot, appDesc); + } + + public void translate(RelRoot relRoot, StreamApplicationDescriptor appDesc) { final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig); - final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery()); final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters); - final RelNode node = relRoot.project(); final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver(); + final RelNode node = relRoot.project(); node.accept(new RelShuttleImpl() { int windowId = 0; int joinId = 0; @Override + public RelNode visit(RelNode relNode) { + if (relNode instanceof TableModify) { + return visit((TableModify) relNode); + } + return super.visit(relNode); + } + + private RelNode visit(TableModify modify) { + if (!modify.isInsert()) { + throw new SamzaException("Not a supported operation: " + modify.toString()); + } + RelNode node = super.visit(modify); + modifyTranslator.translate(modify, context); + return node; + } + + @Override public RelNode visit(TableScan scan) { RelNode node = super.visit(scan); scanTranslator.translate(scan, context); @@ -147,28 +135,6 @@ public class QueryTranslator { } }); - String sink = queryInfo.getSink(); - SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink); - MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId()); - MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sink)); - - Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor(); - if (!tableDescriptor.isPresent()) { - KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); - String systemName = sinkConfig.getSystemName(); - DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); - GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde); - outputStream.sendTo(appDesc.getOutputStream(osd)); - } else { - Table outputTable = appDesc.getTable(tableDescriptor.get()); - if (outputTable == null) { - String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource(); - LOG.error(msg); - throw new SamzaException(msg); - } - outputStream.sendTo(outputTable); - } - appDesc.withContextManager(new ContextManager() { @Override public void init(Config config, TaskContext taskContext) { http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index 2dc28be..771a5d5 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -27,15 +27,15 @@ import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.task.TaskContext; +import org.apache.samza.sql.interfaces.SqlIOConfig; /** @@ -53,6 +53,9 @@ class ScanTranslator { } private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> { + // All the user-supplied functions are expected to be serializable in order to enable full serialization of user + // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator + // initialization. private transient SamzaRelConverter msgConverter; private final String streamName; @@ -83,7 +86,8 @@ class ScanTranslator { final String streamName = sqlIOConfig.getStreamName(); KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); - DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); + DelegatingSystemDescriptor + sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde); MessageStream<KV<Object, Object>> inputStream = streamAppDesc.getInputStream(isd); MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName)); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java index cc339f1..2005c21 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java @@ -42,7 +42,7 @@ public class TestSamzaSqlTable { Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1"; + String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); @@ -58,7 +58,7 @@ public class TestSamzaSqlTable { TestIOResolverFactory.TestTable.records.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1"; + String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id __key__, name from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java index dda0e14..46c0651 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java @@ -20,19 +20,25 @@ package org.apache.samza.sql.runner; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.config.MapConfig; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.testutil.JsonUtil; +import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.junit.Assert; import org.junit.Test; +import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*; +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*; + public class TestSamzaSqlApplicationConfig { @@ -42,8 +48,14 @@ public class TestSamzaSqlApplicationConfig { config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config"); int numUdfs = config.get(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length; - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); - Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size()); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size()); Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size()); Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size()); @@ -54,17 +66,23 @@ public class TestSamzaSqlApplicationConfig { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - try { // Fail because no SQL config - new SamzaSqlApplicationConfig(new MapConfig(config)); + fetchSqlFromConfig(config); Assert.fail(); } catch (SamzaException e) { } // Pass config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1"); - new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<String> sqlStmts = fetchSqlFromConfig(config); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER); testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER); @@ -85,7 +103,12 @@ public class TestSamzaSqlApplicationConfig { "insert into testavro.Profile select * from testavro.SIMPLE1"); Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet(); Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet(); @@ -99,14 +122,24 @@ public class TestSamzaSqlApplicationConfig { private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) { Map<String, String> badConfigs = new HashMap<>(config); badConfigs.remove(configKey); - new SamzaSqlApplicationConfig(new MapConfig(badConfigs)); + List<String> sqlStmts = fetchSqlFromConfig(badConfigs); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + new SamzaSqlApplicationConfig(new MapConfig(badConfigs), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); } private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) { Map<String, String> badConfigs = new HashMap<>(config); badConfigs.remove(configKey); try { - new SamzaSqlApplicationConfig(new MapConfig(badConfigs)); + List<String> sqlStmts = fetchSqlFromConfig(badConfigs); + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + new SamzaSqlApplicationConfig(new MapConfig(badConfigs), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toSet()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); Assert.fail(); } catch (IllegalArgumentException e) { // swallow http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java index 9fab5d5..1ac804e 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java @@ -36,7 +36,7 @@ public class TestSamzaSqlApplicationRunner { @Test public void testComputeSamzaConfigs() { Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); - String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1"; + String sql1 = "Insert into testavro.outputTopic(id,long_value) select id, MyTest(id) as long_value from testavro.SIMPLE1"; configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1); configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName()); MapConfig samzaConfig = new MapConfig(configs); http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index 676781c..458196f 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -31,7 +31,6 @@ import java.util.stream.IntStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.calcite.avatica.util.ByteString; import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.sql.avro.schemas.AddressRecord; @@ -50,7 +49,6 @@ import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemFactory; import org.apache.samza.system.SystemProducer; -import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +71,7 @@ public class TestAvroSystemFactory implements SystemFactory { public static final byte[] DEFAULT_TRACKING_ID_BYTES = {76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1}; + public static List<OutgoingMessageEnvelope> messages = new ArrayList<>(); public static List<String> getPageKeyProfileNameJoin(int numMessages) { http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index 14e2243..a96fd08 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -128,6 +128,9 @@ public class SamzaSqlTestConfig { "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString()); staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString()); staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java index 1723e0e..a84f347 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.List; +import org.apache.samza.sql.testutil.SqlFileParser; import org.junit.Assert; import org.junit.Test;
