Repository: samza Updated Branches: refs/heads/master 5069f1ddb -> ff607cb6b
SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan. This is required for supporting schema evolution without failing the jobs. Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #821 from atoomula/modify and squashes the following commits: 17b4b1c1 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan. 65be581a [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan. fb50ee81 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan. 9fff9573 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan. f3e887c6 [Aditya Toomula] dummy Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff607cb6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff607cb6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff607cb6 Branch: refs/heads/master Commit: ff607cb6b61144ea6feb28842318e1eb1e5eaa5a Parents: 5069f1d Author: Aditya Toomula <[email protected]> Authored: Wed Nov 28 15:53:30 2018 -0800 Committer: Srinivasulu Punuru <[email protected]> Committed: Wed Nov 28 15:53:30 2018 -0800 ---------------------------------------------------------------------- .../samza/sql/dsl/SamzaSqlDslConverter.java | 16 +- .../samza/sql/runner/SamzaSqlApplication.java | 10 +- .../sql/runner/SamzaSqlApplicationConfig.java | 60 ++--- .../sql/runner/SamzaSqlApplicationRunner.java | 7 +- .../samza/sql/translator/ModifyTranslator.java | 127 ----------- .../samza/sql/translator/QueryTranslator.java | 52 ++--- .../runner/TestSamzaSqlApplicationConfig.java | 55 ++++- .../sql/translator/TestQueryTranslator.java | 221 +++++++++++++++---- .../test/samzasql/TestSamzaSqlEndToEnd.java | 4 +- 9 files changed, 295 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java index 6cce906..d4cb134 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java @@ -55,22 +55,20 @@ public class SamzaSqlDslConverter implements DslConverter { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config, queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); QueryPlanner planner = - new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(), + new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(), sqlConfig.getUdfMetadata()); List<RelRoot> relRoots = new LinkedList<>(); for (String sql: sqlStmts) { - // when sql is a query, we only pass the select query to the planner + // we always pass only select query to the planner for samza sql. The reason is that samza sql supports + // schema evolution where source and destination could up to an extent have independent schema evolution while + // calcite expects strict comformance of the destination schema with that of the fields in the select query. SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql); - if (qinfo.getSink().split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) { - sql = qinfo.getSelectQuery(); - } - - relRoots.add(planner.plan(sql)); + relRoots.add(planner.plan(qinfo.getSelectQuery())); } return relRoots; } http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java index b8bb190..868f5a2 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java @@ -21,10 +21,9 @@ package org.apache.samza.sql.runner; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.calcite.rel.RelRoot; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; @@ -52,8 +51,8 @@ public class SamzaSqlApplication implements StreamApplication { Map<Integer, TranslatorContext> translatorContextMap = new HashMap<>(); // 1. Get Calcite plan - Set<String> inputSystemStreams = new HashSet<>(); - Set<String> outputSystemStreams = new HashSet<>(); + List<String> inputSystemStreams = new LinkedList<>(); + List<String> outputSystemStreams = new LinkedList<>(); Collection<RelRoot> relRoots = SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDescriptor.getConfig(), @@ -66,12 +65,13 @@ public class SamzaSqlApplication implements StreamApplication { // 3. Translate Calcite plan to Samza stream operators QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig); SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig); + // QueryId implies the index of the query in multiple query statements scenario. It should always start with 0. int queryId = 0; for (RelRoot relRoot : relRoots) { LOG.info("Translating relRoot {} to samza stream graph with queryId {}", relRoot, queryId); TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext); translatorContextMap.put(queryId, translatorContext); - queryTranslator.translate(relRoot, translatorContext, queryId); + queryTranslator.translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId); queryId++; } http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 6e12c02..e608794 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -106,23 +107,33 @@ public class SamzaSqlApplicationConfig { private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource; private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource; - private final Map<String, SqlIOConfig> systemStreamConfigsBySource; + + // There could only be one output system stream per samza sql statement. The below list datastructure stores the + // output system streams in the order of SQL query statements. Please note that there could be duplicate entries + // in it during a fan-in scenario (e.g. two sql statements with two different input streams but same output stream). + private final List<String> outputSystemStreams; private final String metadataTopicPrefix; private final long windowDurationMs; - public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams, - Set<String> outputSystemStreams) { + public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams, + List<String> outputSystemStreams) { ioResolver = createIOResolver(staticConfig); - inputSystemStreamConfigBySource = inputSystemStreams.stream() + this.outputSystemStreams = new LinkedList<>(outputSystemStreams); + + // There could be duplicate streams across different queries. Let's dedupe them. + Set<String> inputSystemStreamSet = new HashSet<>(inputSystemStreams); + Set<String> outputSystemStreamSet = new HashSet<>(outputSystemStreams); + + inputSystemStreamConfigBySource = inputSystemStreamSet.stream() .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src))); - outputSystemStreamConfigsBySource = outputSystemStreams.stream() + outputSystemStreamConfigsBySource = outputSystemStreamSet.stream() .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x))); - systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource); + Map<String, SqlIOConfig> systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource); systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource); Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(systemStreamConfigsBySource.values()); @@ -140,7 +151,7 @@ public class SamzaSqlApplicationConfig { relSchemaProvidersBySource.get(x.getSource()), c)))); samzaRelTableKeyConvertersBySource = systemStreamConfigs.stream() - .filter(config -> config.isRemoteTable()) + .filter(SqlIOConfig::isRemoteTable) .collect(Collectors.toMap(SqlIOConfig::getSource, x -> initializePlugin("SamzaRelTableKeyConverter", x.getSamzaRelTableKeyConverterName(), staticConfig, CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN, @@ -152,13 +163,6 @@ public class SamzaSqlApplicationConfig { metadataTopicPrefix = staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX); windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS); - - // remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource - outputSystemStreamConfigsBySource.forEach((k, v) -> { - if (k.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) { - systemStreamConfigsBySource.remove(k); - } - }); } public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig, @@ -229,31 +233,35 @@ public class SamzaSqlApplicationConfig { } public static Collection<RelRoot> populateSystemStreamsAndGetRelRoots(List<String> dslStmts, Config config, - Set<String> inputSystemStreams, Set<String> outputSystemStreams) { + List<String> inputSystemStreams, List<String> outputSystemStreams) { // TODO: Get the converter factory based on the file type. Create abstraction around this. DslConverterFactory dslConverterFactory = new SamzaSqlDslConverterFactory(); DslConverter dslConverter = dslConverterFactory.create(config); Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts)); - // FIXME: the snippet below dose not work when sql is a query + // RelRoot does not have sink node for Samza SQL dsl, so we can not traverse the relRoot tree to get + // "outputSystemStreams" + // FIXME: the snippet below does not work for Samza SQL dsl but is required for other dsls. Future fix could be + // for samza sql to build TableModify for sink and stick it to the relRoot, so we could get output stream out of it. + // for (RelRoot relRoot : relRoots) { // SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams); // } - // RelRoot does not have sink node (aka. log.outputStream) when Sql statement is a query, so we - // can not traverse the tree of relRoot to get "outputSystemStreams" + // The below code is specific to Samza SQL dsl and should be removed once Samza SQL includes sink as part of + // relRoot and the above code in uncommented. List<String> sqlStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config); List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlDslConverter.fetchQueryInfo(sqlStmts); inputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet())); - outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList())); + outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); return relRoots; } - private static void populateSystemStreams(RelNode relNode, Set<String> inputSystemStreams, - Set<String> outputSystemStreams) { + private static void populateSystemStreams(RelNode relNode, List<String> inputSystemStreams, + List<String> outputSystemStreams) { if (relNode instanceof TableModify) { outputSystemStreams.add(getSystemStreamName(relNode)); } else { @@ -282,6 +290,10 @@ public class SamzaSqlApplicationConfig { return udfMetadata; } + public List<String> getOutputSystemStreams() { + return outputSystemStreams; + } + public Map<String, SqlIOConfig> getInputSystemStreamConfigBySource() { return inputSystemStreamConfigBySource; } @@ -290,10 +302,6 @@ public class SamzaSqlApplicationConfig { return outputSystemStreamConfigsBySource; } - public Map<String, SqlIOConfig> getSystemStreamConfigsBySource() { - return systemStreamConfigsBySource; - } - public Map<String, SamzaRelConverter> getSamzaRelConverters() { return samzaRelConvertersBySource; } http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 41e37f1..d9a44ec 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -21,10 +21,9 @@ package org.apache.samza.sql.runner; import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.commons.lang3.Validate; import org.apache.samza.application.SamzaApplication; import org.apache.samza.config.Config; @@ -83,8 +82,8 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner { String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(dslStmts); newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson); - Set<String> inputSystemStreams = new HashSet<>(); - Set<String> outputSystemStreams = new HashSet<>(); + List<String> inputSystemStreams = new LinkedList<>(); + List<String> outputSystemStreams = new LinkedList<>(); SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, config, inputSystemStreams, outputSystemStreams); http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java deleted file mode 100644 index dbeabab..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java +++ /dev/null @@ -1,127 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql.translator; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.calcite.rel.core.TableModify; -import org.apache.commons.lang.Validate; -import org.apache.samza.SamzaException; -import org.apache.samza.application.descriptors.StreamApplicationDescriptor; -import org.apache.samza.context.Context; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.system.descriptors.GenericOutputDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.table.descriptors.TableDescriptor; -import org.apache.samza.system.descriptors.DelegatingSystemDescriptor; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.sql.data.SamzaSqlRelMessage; -import org.apache.samza.sql.interfaces.SamzaRelConverter; -import org.apache.samza.sql.interfaces.SqlIOConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationContext; -import org.apache.samza.table.Table; - - -/** - * Translator to translate the TableModify in relational graph to the corresponding output streams in the StreamGraph - * implementation - */ -class ModifyTranslator { - - private final Map<String, SamzaRelConverter> relMsgConverters; - private final Map<String, SqlIOConfig> systemStreamConfig; - private final int queryId; - - ModifyTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc, int queryId) { - relMsgConverters = converters; - this.systemStreamConfig = ssc; - this.queryId = queryId; - } - - // OutputMapFunction converts SamzaSqlRelMessage to SamzaMessage in KV format - private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> { - // All the user-supplied functions are expected to be serializable in order to enable full serialization of user - // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator - // initialization. - private transient SamzaRelConverter samzaMsgConverter; - private final String outputTopic; - private final int queryId; - - OutputMapFunction(String outputTopic, int queryId) { - this.outputTopic = outputTopic; - this.queryId = queryId; - } - - @Override - public void init(Context context) { - TranslatorContext translatorContext = - ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId); - this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic); - } - - @Override - public KV<Object, Object> apply(SamzaSqlRelMessage message) { - return this.samzaMsgConverter.convertToSamzaMessage(message); - } - } - - void translate(final TableModify tableModify, final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors, - Map<String, OutputStream> outputMsgStreams) { - - StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor(); - List<String> tableNameParts = tableModify.getTable().getQualifiedName(); - String targetName = SqlIOConfig.getSourceFromSourceParts(tableNameParts); - - Validate.isTrue(relMsgConverters.containsKey(targetName), String.format("Unknown source %s", targetName)); - - SqlIOConfig sinkConfig = systemStreamConfig.get(targetName); - - final String systemName = sinkConfig.getSystemName(); - final String streamId = sinkConfig.getStreamId(); - final String source = sinkConfig.getSource(); - - KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); - DelegatingSystemDescriptor - sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new); - GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamId, noOpKVSerde); - - MessageStreamImpl<SamzaSqlRelMessage> stream = - (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId()); - MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(targetName, queryId)); - - Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor(); - if (!tableDescriptor.isPresent()) { - OutputStream stm = outputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getOutputStream(osd)); - outputStream.sendTo(stm); - } else { - Table outputTable = streamAppDesc.getTable(tableDescriptor.get()); - if (outputTable == null) { - String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource(); - throw new SamzaException(msg); - } - outputStream.sendTo(outputTable); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index bb34a41..fef5471 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -32,6 +32,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.commons.lang.Validate; import org.apache.samza.SamzaException; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.context.Context; @@ -71,7 +72,6 @@ public class QueryTranslator { private transient SamzaRelConverter samzaMsgConverter; private final String outputTopic; private final int queryId; - static OutputStream logOutputStream; OutputMapFunction(String outputTopic, int queryId) { this.outputTopic = outputTopic; @@ -103,14 +103,14 @@ public class QueryTranslator { * For unit testing only */ @VisibleForTesting - public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) { + void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) { QueryPlanner planner = - new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(), + new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(), sqlConfig.getUdfMetadata()); - final RelRoot relRoot = planner.plan(queryInfo.getSql()); + final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery()); SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig); TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext); - translate(relRoot, translatorContext, queryId); + translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId); Map<Integer, TranslatorContext> translatorContexts = new HashMap<>(); translatorContexts.put(queryId, translatorContext.clone()); appDesc.withApplicationTaskContextFactory((jobContext, @@ -120,12 +120,19 @@ public class QueryTranslator { new SamzaSqlApplicationContext(translatorContexts)); } - public void translate(RelRoot relRoot, TranslatorContext translatorContext, int queryId) { + /** + * Translate Calcite plan to Samza stream operators. + * @param relRoot Calcite plan in the form of {@link RelRoot}. RelRoot should not include the sink ({@link TableModify}) + * @param outputSystemStream Sink associated with the Calcite plan. + * @param translatorContext Context maintained across translations. + * @param queryId query index of the sql statement corresponding to the Calcite plan in multi SQL statement scenario + * starting with index 0. + */ + public void translate(RelRoot relRoot, String outputSystemStream, TranslatorContext translatorContext, int queryId) { final RelNode node = relRoot.project(); + ScanTranslator scanTranslator = new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId); - ModifyTranslator modifyTranslator = - new ModifyTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getOutputSystemStreamConfigsBySource(), queryId); node.accept(new RelShuttleImpl() { int windowId = 0; @@ -134,21 +141,11 @@ public class QueryTranslator { @Override public RelNode visit(RelNode relNode) { - if (relNode instanceof TableModify) { - return visit((TableModify) relNode); - } + // There should never be a TableModify in the calcite plan. + Validate.isTrue(!(relNode instanceof TableModify)); return super.visit(relNode); } - private RelNode visit(TableModify modify) { - if (!modify.isInsert()) { - throw new SamzaException("Not a supported operation: " + modify.toString()); - } - RelNode node = super.visit(modify); - modifyTranslator.translate(modify, translatorContext, systemDescriptors, outputMsgStreams); - return node; - } - @Override public RelNode visit(TableScan scan) { RelNode node = super.visit(scan); @@ -190,14 +187,7 @@ public class QueryTranslator { } }); - // the snippet below will be performed only when sql is a query statement - sqlConfig.getOutputSystemStreamConfigsBySource().keySet().forEach( - key -> { - if (key.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) { - sendToOutputStream(key, streamAppDescriptor, translatorContext, node, queryId); - } - } - ); + sendToOutputStream(outputSystemStream, streamAppDescriptor, translatorContext, node, queryId); } private void sendToOutputStream(String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, int queryId) { @@ -211,10 +201,8 @@ public class QueryTranslator { DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new); GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde); - if (OutputMapFunction.logOutputStream == null) { - OutputMapFunction.logOutputStream = appDesc.getOutputStream(osd); - } - outputStream.sendTo(OutputMapFunction.logOutputStream); + OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd)); + outputStream.sendTo(stm); } else { Table outputTable = appDesc.getTable(tableDescriptor.get()); if (outputTable == null) { http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java index 46c0651..294eccd 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java @@ -37,7 +37,6 @@ import org.junit.Assert; import org.junit.Test; import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*; -import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*; public class TestSamzaSqlApplicationConfig { @@ -53,8 +52,8 @@ public class TestSamzaSqlApplicationConfig { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size()); Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size()); @@ -80,8 +79,8 @@ public class TestSamzaSqlApplicationConfig { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER); testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER); @@ -98,7 +97,7 @@ public class TestSamzaSqlApplicationConfig { } @Test - public void testGetInputAndOutputStreamConfigs() { + public void testGetInputAndOutputStreamConfigsFanOut() { List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1", "insert into testavro.Profile select * from testavro.SIMPLE1"); Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); @@ -107,16 +106,48 @@ public class TestSamzaSqlApplicationConfig { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet(); Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet(); + List<String> outputStreamList = samzaSqlApplicationConfig.getOutputSystemStreams(); + Assert.assertEquals(1, inputKeys.size()); Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1")); Assert.assertEquals(2, outputKeys.size()); Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1")); Assert.assertTrue(outputKeys.contains("testavro.Profile")); + Assert.assertEquals(2, outputStreamList.size()); + Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(0)); + Assert.assertEquals("testavro.Profile", outputStreamList.get(1)); + } + + @Test + public void testGetInputAndOutputStreamConfigsFanIn() { + List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1", + "insert into testavro.COMPLEX1 select * from testavro.SIMPLE2"); + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); + + Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet(); + Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet(); + List<String> outputStreamList = samzaSqlApplicationConfig.getOutputSystemStreams(); + + Assert.assertEquals(2, inputKeys.size()); + Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1")); + Assert.assertTrue(inputKeys.contains("testavro.SIMPLE2")); + Assert.assertEquals(1, outputKeys.size()); + Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1")); + Assert.assertEquals(2, outputStreamList.size()); + Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(0)); + Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(1)); } private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) { @@ -126,8 +157,8 @@ public class TestSamzaSqlApplicationConfig { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); new SamzaSqlApplicationConfig(new MapConfig(badConfigs), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); } private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) { @@ -138,8 +169,8 @@ public class TestSamzaSqlApplicationConfig { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); new SamzaSqlApplicationConfig(new MapConfig(badConfigs), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); Assert.fail(); } catch (IllegalArgumentException e) { // swallow http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index 0c9091d..d9039ec 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -19,6 +19,7 @@ package org.apache.samza.sql.translator; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -34,6 +35,7 @@ import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.sql.testutil.JsonUtil; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.junit.Assert; @@ -65,8 +67,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig); QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig); @@ -92,6 +94,145 @@ public class TestQueryTranslator { } @Test + public void testTranslateFanIn() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE2"; + String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql1, sql2); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); + + StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig); + QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig); + + translator.translate(queryInfo.get(0), appDesc, 0); + translator.translate(queryInfo.get(1), appDesc, 1); + OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph(); + + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String inputStreamId1 = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String inputSystem1 = streamConfig.getSystem(inputStreamId1); + String inputPhysicalName1 = streamConfig.getPhysicalName(inputStreamId1); + String inputStreamId2 = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get(); + String inputSystem2 = streamConfig.getSystem(inputStreamId2); + String inputPhysicalName2 = streamConfig.getPhysicalName(inputStreamId2); + + String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String outputSystem = streamConfig.getSystem(outputStreamId); + String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId); + + Assert.assertEquals(1, specGraph.getOutputStreams().size()); + Assert.assertEquals("testavro", outputSystem); + Assert.assertEquals("simpleOutputTopic", outputPhysicalName); + + Assert.assertEquals(2, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", inputSystem1); + Assert.assertEquals("SIMPLE2", inputPhysicalName1); + Assert.assertEquals("testavro", inputSystem2); + Assert.assertEquals("SIMPLE1", inputPhysicalName2); + } + + @Test + public void testTranslateFanOut() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1"; + String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql1, sql2); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); + + StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig); + QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig); + + translator.translate(queryInfo.get(0), appDesc, 0); + translator.translate(queryInfo.get(1), appDesc, 1); + OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph(); + + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String inputSystem = streamConfig.getSystem(inputStreamId); + String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId); + String outputStreamId1 = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String outputSystem1 = streamConfig.getSystem(outputStreamId1); + String outputPhysicalName1 = streamConfig.getPhysicalName(outputStreamId1); + String outputStreamId2 = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); + String outputSystem2 = streamConfig.getSystem(outputStreamId2); + String outputPhysicalName2 = streamConfig.getPhysicalName(outputStreamId2); + + Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals("testavro", outputSystem1); + Assert.assertEquals("SIMPLE2", outputPhysicalName1); + Assert.assertEquals("testavro", outputSystem2); + Assert.assertEquals("SIMPLE3", outputPhysicalName2); + + Assert.assertEquals(1, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", inputSystem); + Assert.assertEquals("SIMPLE1", inputPhysicalName); + } + + @Test + public void testTranslateMultiSql() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; + String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE2"; + List<String> sqlStmts = Arrays.asList(sql1, sql2); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); + + StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig); + QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig); + + translator.translate(queryInfo.get(0), appDesc, 0); + translator.translate(queryInfo.get(1), appDesc, 1); + OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph(); + + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String inputStreamId1 = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String inputSystem1 = streamConfig.getSystem(inputStreamId1); + String inputPhysicalName1 = streamConfig.getPhysicalName(inputStreamId1); + String inputStreamId2 = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get(); + String inputSystem2 = streamConfig.getSystem(inputStreamId2); + String inputPhysicalName2 = streamConfig.getPhysicalName(inputStreamId2); + + String outputStreamId1 = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String outputSystem1 = streamConfig.getSystem(outputStreamId1); + String outputPhysicalName1 = streamConfig.getPhysicalName(outputStreamId1); + String outputStreamId2 = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); + String outputSystem2 = streamConfig.getSystem(outputStreamId2); + String outputPhysicalName2 = streamConfig.getPhysicalName(outputStreamId2); + + Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals("testavro", outputSystem1); + Assert.assertEquals("simpleOutputTopic", outputPhysicalName1); + Assert.assertEquals("testavro", outputSystem2); + Assert.assertEquals("SIMPLE3", outputPhysicalName2); + + Assert.assertEquals(2, specGraph.getInputOperators().size()); + Assert.assertEquals("testavro", inputSystem1); + Assert.assertEquals("SIMPLE1", inputPhysicalName1); + Assert.assertEquals("testavro", inputSystem2); + Assert.assertEquals("SIMPLE2", inputPhysicalName2); + } + + @Test public void testTranslateComplex() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, @@ -102,8 +243,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -139,8 +280,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -179,8 +320,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -204,8 +345,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -229,8 +370,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -254,8 +395,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -276,8 +417,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -300,8 +441,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -325,8 +466,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -349,8 +490,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -373,8 +514,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -397,8 +538,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -421,8 +562,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -449,8 +590,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -474,8 +615,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -541,8 +682,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -609,8 +750,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -676,8 +817,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); @@ -706,8 +847,8 @@ public class TestQueryTranslator { List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 51cb1a9..3fc5750 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -138,7 +138,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { @Test public void testEndToEndMultiSqlStmts() { - int numMessages = 20; + int numMessages = 4; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; @@ -205,7 +205,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { @Test public void testEndToEndFanOut() { - int numMessages = 20; + int numMessages = 4; TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";
